import {Inject, Injectable} from '@angular/core';
import {ENVIRONMENT} from '@forlabs/api-bridge';
import {Store} from '@ngrx/store';
import jwt_decode from 'jwt-decode';
import {buffer, ReplaySubject, take, tap, throttleTime} from 'rxjs';
import {mercureActionProcessEvent, mercureActionScheduleReconnect, MercureEvent} from '../+state/_mercure/actions';
import {metaActionInit} from '../+state/_meta/actions';
import {FollowmeDynamicEnvironment} from '../environment/followme-dynamic-environment';
import {CurrentUserService} from './current-user.service';
import {MercureFallbackService} from './mercure-fallback.service';


// Until we switch to a more recent version of ES
const objectGroupBy = <T>(
  array: T[],
  callback: (item: T, index: number, array: T[]) => string,
): {[key: string]: T[]} => array.reduce(
  (acc = {}, item, index, array) => {
    const key = callback(item, index, array);
    acc[key] ??= [];
    acc[key].push(item);
    return acc;
  },
  {},
);

@Injectable({providedIn: 'root'})
export class MercureService {
  private readonly bufferDuration = 5000;
  private readonly maxEventsInBufferBeforeRefresh = 100;

  private eventSource: EventSource;

  // We want to buffer events that we receive so that we can treat them by batches instead of one by one (see handleEvents method).
  // For that, we use events$ as our observable of raw events, emitted as soon as they are received by the event source.
  // Then we use bufferedEvents$ to put them inside a buffer that groups them:
  //  - when the buffer is empty and we receive an event, start a timer (bufferDuration).
  //  - every even we receive during this timer is going to be stored in a buffer.
  //  - at the end of the timer, emit the buffer (an array of events) and empty the buffer.
  //  - start again when we receive another event in the future.
  private readonly events$: ReplaySubject<MercureEvent> = new ReplaySubject(1);
  private readonly bufferedEvents$ = this.events$.pipe(
    buffer(
      // The argument of "buffer(...)" is an observable that determines when to emit the content of the buffer.
      // We use throttleTime with trailing emissions here to emit a little while after receiving a first message in the
      // buffer.
      this.events$.pipe(throttleTime(this.bufferDuration, undefined, {leading: false, trailing: true})),
    ),
  );

  constructor(
    private store: Store,
    private mercureFallbackService: MercureFallbackService,
    private currentUserService: CurrentUserService,
    @Inject(ENVIRONMENT) protected environment: FollowmeDynamicEnvironment,
  ) {
    this.bufferedEvents$.subscribe((events) => this.handleEvents(events));
  }

  private handleEvents(events: MercureEvent[]): void {
    // If we received a whole lot of events in a single buffer, most likely we're reconnecting after a while and we should
    // just refresh the app.
    if (events.length > this.maxEventsInBufferBeforeRefresh) {
      // TODO: Ideally, we should also throttle these. We don't really want to send one every few seconds when reconnecting
      //  takes a while to read mercure messages. At the very least we don't want to do it while the previous reload is
      //  still ongoing. We need to see how it works in prod env first before deciding on the priority of something like that.
      console.log(`Too many mercure events received at the same time (${events.length}), reloading data instead of handling them one by one.`);
      this.currentUserService.currentUser$.pipe(
        take(1),
        // We don't make sure that user is indeed defined, but it'd be really strange if it wasn't since we're
        // connected to the EventSource and, in FollowMe, that means we are authenticated.
        tap((user) => this.store.dispatch(metaActionInit({user}))),
      ).subscribe();
      return;
    }

    // If we received fewer events, still we should avoid duplicating them. Each of them may trigger redraws, but more
    // importantly some effects trigger API calls, like the update of a PatientStep triggering a call to its Patient.
    const eventsByIri = objectGroupBy(events, (event) => event['@id']);

    // For each iri we obtained, only dispatch the last event we received.
    for (const eventIri of Object.keys(eventsByIri)) {
      const eventsForIri = eventsByIri[eventIri];
      console.log(`Dispatching last mercure event for ${eventIri}` + (eventsForIri.length > 0 ? ` (skipped ${eventsForIri.length - 1}).` : '.'));
      this.store.dispatch(mercureActionProcessEvent({event: eventsForIri[eventsForIri.length - 1]}));
    }
  }

  private getEventSource(token: string, topics: string[]): EventSource {
    const finalUrl = new URL(this.environment.mercureUrl);
    for (const topic of topics) {
      finalUrl.searchParams.append('topic', topic);
    }
    finalUrl.searchParams.append('authorization', token);

    // https://symfony.com/doc/current/mercure.html#subscribing
    // Explicitly use the polyfill to support Headers
    // Override lastEventIdQueryParameterName to match the Mercure server expected parameter name
    // ==> We don't use the headers for authorization anymore, we use query parameters.
    // ==> Let's try without lastEventIdQueryParameterName and see if things work well, so that we can remove the polyfill.
    return new EventSource(finalUrl, {
      // headers: {
      //   'Authorization': 'Bearer ' + token,
      // },
      // lastEventIdQueryParameterName: 'lastEventID',
    });
  }

  public startEventSource(mercureJwt: string): boolean {
    try {
      if (
        this.eventSource?.readyState === EventSource.OPEN ||
          this.eventSource?.readyState === EventSource.CONNECTING
      ) {
        console.log('EventSource already connected or connecting');
        return true;
      }

      const decodedJwt = jwt_decode(mercureJwt);
      const topics = decodedJwt['mercure'].subscribe;
      this.eventSource = this.getEventSource(mercureJwt, topics);

      this.eventSource.onopen = () => {
        this.mercureFallbackService.disable();
      };

      this.eventSource.onmessage = (event) => {
        this.events$.next(JSON.parse(event.data));
        this.mercureFallbackService.disable();
      };

      this.eventSource.onerror = (error) => {
        console.error('MERCURE ERROR EVENT', error, this.eventSource);
        if (this.eventSource?.readyState === EventSource.CLOSED) {
          console.error('MERCURE ERROR EVENT: EVENTSOURCE IS CLOSED', error, this.eventSource);
          // Enable fallback mode (will be disabled when a new event is received)
          this.mercureFallbackService.enable();
          // Schedule a reconnect
          this.store.dispatch(mercureActionScheduleReconnect({jwt: mercureJwt}));
        }
      };

      return true;
    } catch (e) {
      console.error('MERCURE START ERROR', e);
      this.mercureFallbackService.enable();
      return false;
    }
  }

  public stopEventSource(): void {
    this.eventSource?.close();
  }
}
