import {
  from,
  interval,
  Observable,
  catchError,
  filter,
  mergeMap,
  exhaustMap,
  EMPTY,
  Subject,
} from "rxjs";
import { ConversationId } from "./conversation-id.model";
import { ConversationApiClient } from "./conversation.apiclient";
import { ConversationIdStore } from "./conversation-id.store";
import { MessagesFetcher } from "./messages.fetcher";
import { IncomingConversationBlock } from "./incoming.model";
import { ConversationUtil } from "./conversation.util";
import { CountDown } from "app/core";
import { takeUntilDestroyed } from "@angular/core/rxjs-interop";

/**
 * Manages the sending of outgoing messages and the retrieval of
 * conversation blocks, by means of a simple HTTP request polling strategy.
 *
 * A polling sequence is started after a user
 * message is sent, for at most 60 seconds. The bot always generates messages that are an answer to a
 * user message, hence it is not necessary to poll for conversation blocks once all bot answers to user
 * messages have been received.
 */
export class MessagesPollingFetcher implements MessagesFetcher {
  /** The polling is effective only when the application is fully initialized */
  private retrievalEffective = false;

  /** The subject into which retrieved conversation blocks from the server will be pushed. */
  private conversationBlocks$ = new Subject<IncomingConversationBlock>();
  private lastTimestamp: number = 0;

  /**
   * @param pollingPeriodInMs Interval between requests, in milliseconds
   * @param pollingCountDown Counter for polling requests to be executed.
   */
  constructor(
    private readonly pollingPeriodInMs: number,
    private readonly pollingCountDown: CountDown,
    private readonly conversationApiClient: ConversationApiClient,
    private readonly conversationIdStore: ConversationIdStore,
    private readonly conversationUtil: ConversationUtil
  ) {
    interval(this.pollingPeriodInMs)
      .pipe(
        // once the application is fully started, polling can start
        filter(() => this.retrievalEffective === true),
        filter(() => this.pollingCountDown.countDown()),
        // a new request is sent only if the previous one is completed, ignoring the new request if not
        exhaustMap(() => this.fetchConversationBlocks()),
        takeUntilDestroyed()
      )
      .subscribe(this.conversationBlocks$);
  }

  /** If unlocked before the application is fully started, consumers may miss the first messages */
  public initialize(): void {
    this.retrievalEffective = true;
  }

  /** Launch a new polling sequence */
  public awaitIncomingMessages(): void {
    this.pollingCountDown.recharge();
  }

  /** Stop the polling sequence */
  public stop(): void {
    this.retrievalEffective = false;
  }

  /**
   * The polling is launched only when the first subscription is made.
   * HTTP polling requests are executed only when a new message is sent,
   * and for a limited period of time.
   */
  public get(): Observable<IncomingConversationBlock> {
    return this.conversationBlocks$.asObservable();
  }

  /** Fetches the conversation blocks added to the current conversation since the last fetch.
   * Each block of the conversation is pushed into the returned observable. */
  private fetchConversationBlocks(): Observable<IncomingConversationBlock> {
    return this.conversationIdStore
      .get()
      .pipe(
        mergeMap((conversationId: ConversationId) =>
          this.conversationApiClient.retrieve(conversationId, this.lastTimestamp)
        ),
        mergeMap((blocks: IncomingConversationBlock[]) => {
          const mostRecentTimestamp = this.conversationUtil.sortBlocks(blocks);
          this.lastTimestamp = Math.max(this.lastTimestamp, mostRecentTimestamp);
          return from(blocks);
        })
      )
      .pipe(
        catchError((error) => {
          if (error.status === 401 || error.status === 422) {
            this.conversationIdStore.invalidate();
          }
          console.error("Unable to retrieve conversation blocks", error);
          return EMPTY; // on error, we don't want the polling observable to fail
        })
      );
  }
}
