import noop from '@splunk/dashboard-utils/noop';
import type { Subscription as ObservableSubscription } from 'rxjs';
import type {
    ErrorPayload,
    OnDataCallback,
    OnErrorCallback,
    OnCancelCallback,
    OnStartCallback,
    OnCompleteCallback,
    RequestParams,
    SearchData,
} from '@splunk/dashboard-types';

import { formatData } from '@splunk/datasource-utils';

import type DataSourceController from './DataSourceController';

type EmptyObject = Record<string, never>;

type LastState = SearchData | ErrorPayload | EmptyObject;

/**
 * A DataSource subscription
 * This represent a DataSource -- Consumer binding.
 * Be aware that the binding can be n to n.
 */
class Subscription {
    consumerId: string;

    dataSource: DataSourceController;

    requestParams?: RequestParams;

    lastState: LastState;

    sub: ObservableSubscription | null;

    dataCallback?: OnDataCallback;

    errorCallback?: OnErrorCallback;

    cancelCallback?: OnCancelCallback;

    startCallback?: OnStartCallback;

    completeCallback?: OnCompleteCallback;

    /**
     *
     * @param {Object} dataSource DataSource controller
     * @param {String} consumerId Consumer id
     * @param {Object} initialRequestParams initial requestParams
     */
    constructor(
        dataSource: DataSourceController,
        consumerId: string,
        initialRequestParams?: RequestParams
    ) {
        this.consumerId = consumerId;
        this.dataSource = dataSource;
        this.requestParams = initialRequestParams;
        this.lastState = {};
        this.sub = null;
    }

    /**
     * get last state of this subscription
     * @returns {Object} last state, either be data or error
     */
    getLastState = (): LastState => {
        return this.lastState;
    };

    /**
     * start polling data
     */
    startPolling(): void {
        this.start();
        if (this.dataSource.setupError) {
            this.error({
                level: 'error',
                message: this.dataSource.setupError,
            });
            return;
        }
        try {
            const ob = this.dataSource.request(this.requestParams);
            this.sub = ob.subscribe(
                ({ data, meta = {} }) => {
                    const formattedData = formatData(data);
                    if (formattedData != null) {
                        const payload = {
                            requestParams: this.requestParams,
                            data: formattedData,
                            meta,
                        };
                        this.lastState = payload;
                        this.progress(payload);
                    } else {
                        this.error({
                            level: 'error',
                            message: `${this.dataSource.def.type} returns invalid data`,
                        });
                    }
                },
                ({ level, message, meta = {} }) => {
                    const payload = {
                        level,
                        message,
                        meta,
                    };
                    this.lastState = payload;
                    this.error(payload);
                },
                () => {
                    this.done();
                }
            );
        } catch (ex) {
            if (ex instanceof Error) {
                this.error({
                    level: 'error',
                    message: ex.message,
                });
            }
        }
    }

    /**
     * cancel this subscription
     */
    cancel(): void {
        if (this.sub) {
            this.sub.unsubscribe();
        }
        this.dataCallback = undefined;
        this.errorCallback = undefined;
        this.lastState = {};
        if (this.cancelCallback) {
            this.cancelCallback(this.consumerId);
        }
    }

    /**
     * refresh current subscription
     */
    refresh(): void {
        if (this.sub) {
            this.sub.unsubscribe();
        }
        this.startPolling();
    }

    /**
     *
     */
    start(): void {
        if (this.startCallback) {
            this.startCallback(this.consumerId);
        }
    }

    /**
     * send back data and broadcast progress event
     */
    progress = (payload: SearchData): void => {
        if (this.dataCallback) {
            this.dataCallback(payload);
        }
        this.dataSource.broadcast({
            eventType: 'datasource.progress',
            targetId: this.dataSource.id,
            payload: {
                consumerId: this.consumerId,
                ...payload,
            },
        });
    };

    /**
     * send back error and broadcast error event
     */
    error = (payload: ErrorPayload): void => {
        if (this.errorCallback) {
            this.errorCallback(payload);
        }
        this.dataSource.broadcast({
            eventType: 'datasource.error',
            targetId: this.dataSource.id,
            payload: {
                consumerId: this.consumerId,
                requestParams: this.requestParams,
                error: payload,
            },
        });
        if (this.completeCallback) {
            this.completeCallback(this.consumerId);
        }
    };

    /**
     * broadcast done event
     */
    done = (): void => {
        this.dataSource.broadcast({
            eventType: 'datasource.done',
            targetId: this.dataSource.id,
            payload: {
                consumerId: this.consumerId,
                // we send back the last state part of done event
                ...this.lastState,
            },
        });
        if (this.completeCallback) {
            this.completeCallback(this.consumerId);
        }
    };

    /**
     * register subscription start callback
     * @param {Function} callback
     */
    onStart(callback: OnStartCallback): void {
        this.startCallback = callback;
    }

    /**
     * register subscription cancel callback
     * @param {Function} callback cancel callback
     */
    onCancel(callback: OnCancelCallback): void {
        this.cancelCallback = callback;
    }

    /**
     * register complete callback
     * @param {Function} callback
     */
    onComplete(callback: OnCompleteCallback): void {
        this.completeCallback = callback;
    }

    /**
     * Subscribe to DataSource data
     * @param {Function} onData data callback
     * @param {Function} onError error callback
     */
    subscribeToData({ onData = noop, onError = noop } = {}): void {
        this.dataCallback = onData;
        this.errorCallback = onError;
        this.startPolling();
    }

    /**
     * update request params for this subscription
     * @param {Object} newRequestParams new request params
     */
    updateRequestParams(newRequestParams: RequestParams): void {
        if (this.sub) {
            this.sub.unsubscribe();
        }
        this.requestParams = newRequestParams;
        this.startPolling();
    }
}

export default Subscription;
