From d5e0087f825faf89094ae639d88e26c18b3708c0 Mon Sep 17 00:00:00 2001 From: Joshua Li Date: Mon, 2 Dec 2024 20:01:48 -0800 Subject: [PATCH] [Discover] fix PPL to not throw error if aggregation query fails (#8992) Signed-off-by: Joshua Li --- .../query_enhancements/common/utils.test.ts | 6 +- .../query_enhancements/common/utils.ts | 2 +- .../search/ppl_async_search_strategy.ts | 6 +- .../server/search/ppl_search_strategy.test.ts | 372 ++++++++++++++++++ .../server/search/ppl_search_strategy.ts | 6 +- .../search/sql_async_search_strategy.ts | 6 +- .../server/search/sql_search_strategy.ts | 4 +- 7 files changed, 387 insertions(+), 15 deletions(-) create mode 100644 src/plugins/query_enhancements/server/search/ppl_search_strategy.test.ts diff --git a/src/plugins/query_enhancements/common/utils.test.ts b/src/plugins/query_enhancements/common/utils.test.ts index 39bbdc258bea..787cebb0c082 100644 --- a/src/plugins/query_enhancements/common/utils.test.ts +++ b/src/plugins/query_enhancements/common/utils.test.ts @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -import { handleFacetError } from './utils'; +import { throwFacetError } from './utils'; describe('handleFacetError', () => { const error = new Error('mock-error'); @@ -16,9 +16,9 @@ describe('handleFacetError', () => { data: error, }; - expect(() => handleFacetError(response)).toThrowError(); + expect(() => throwFacetError(response)).toThrowError(); try { - handleFacetError(response); + throwFacetError(response); } catch (err: any) { expect(err.message).toBe('test error message'); expect(err.name).toBe('400'); diff --git a/src/plugins/query_enhancements/common/utils.ts b/src/plugins/query_enhancements/common/utils.ts index 9b2bb9e3aacf..29e49b00eab0 100644 --- a/src/plugins/query_enhancements/common/utils.ts +++ b/src/plugins/query_enhancements/common/utils.ts @@ -42,7 +42,7 @@ export const removeKeyword = (queryString: string | undefined) => { return queryString?.replace(new RegExp('.keyword'), '') ?? ''; }; -export const handleFacetError = (response: any) => { +export const throwFacetError = (response: any) => { const error = new Error(response.data.body?.message ?? response.data.body ?? response.data); error.name = response.data.status ?? response.status ?? response.data.statusCode; (error as any).status = error.name; diff --git a/src/plugins/query_enhancements/server/search/ppl_async_search_strategy.ts b/src/plugins/query_enhancements/server/search/ppl_async_search_strategy.ts index 309c5fd522b6..2af66fb427c2 100644 --- a/src/plugins/query_enhancements/server/search/ppl_async_search_strategy.ts +++ b/src/plugins/query_enhancements/server/search/ppl_async_search_strategy.ts @@ -13,7 +13,7 @@ import { Query, } from '../../../data/common'; import { ISearchStrategy, SearchUsage } from '../../../data/server'; -import { buildQueryStatusConfig, getFields, handleFacetError, SEARCH_STRATEGY } from '../../common'; +import { buildQueryStatusConfig, getFields, throwFacetError, SEARCH_STRATEGY } from '../../common'; import { Facet } from '../utils'; export const pplAsyncSearchStrategyProvider = ( @@ -45,7 +45,7 @@ export const pplAsyncSearchStrategyProvider = ( request.body = { ...request.body, lang: SEARCH_STRATEGY.PPL }; const rawResponse: any = await pplAsyncFacet.describeQuery(context, request); - if (!rawResponse.success) handleFacetError(rawResponse); + if (!rawResponse.success) throwFacetError(rawResponse); const statusConfig = buildQueryStatusConfig(rawResponse); @@ -60,7 +60,7 @@ export const pplAsyncSearchStrategyProvider = ( request.params = { queryId: inProgressQueryId }; const queryStatusResponse = await pplAsyncJobsFacet.describeQuery(context, request); - if (!queryStatusResponse.success) handleFacetError(queryStatusResponse); + if (!queryStatusResponse.success) throwFacetError(queryStatusResponse); const queryStatus = queryStatusResponse.data?.status; logger.info(`pplAsyncSearchStrategy: JOB: ${inProgressQueryId} - STATUS: ${queryStatus}`); diff --git a/src/plugins/query_enhancements/server/search/ppl_search_strategy.test.ts b/src/plugins/query_enhancements/server/search/ppl_search_strategy.test.ts new file mode 100644 index 000000000000..ae8105180db8 --- /dev/null +++ b/src/plugins/query_enhancements/server/search/ppl_search_strategy.test.ts @@ -0,0 +1,372 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +import { + ILegacyClusterClient, + Logger, + RequestHandlerContext, + SharedGlobalConfig, +} from 'opensearch-dashboards/server'; +import { Observable, of } from 'rxjs'; +import { DATA_FRAME_TYPES, IOpenSearchDashboardsSearchRequest } from '../../../data/common'; +import { SearchUsage } from '../../../data/server'; +import * as utils from '../../common/utils'; +import * as facet from '../utils/facet'; +import { pplSearchStrategyProvider } from './ppl_search_strategy'; + +jest.mock('../../common/utils', () => ({ + ...jest.requireActual('../../common/utils'), + getFields: jest.fn(), +})); + +describe('pplSearchStrategyProvider', () => { + let config$: Observable; + let logger: Logger; + let client: ILegacyClusterClient; + let usage: SearchUsage; + const emptyRequestHandlerContext = ({} as unknown) as RequestHandlerContext; + + beforeEach(() => { + config$ = of({} as SharedGlobalConfig); + logger = ({ + error: jest.fn(), + } as unknown) as Logger; + client = {} as ILegacyClusterClient; + usage = { + trackSuccess: jest.fn(), + trackError: jest.fn(), + } as SearchUsage; + }); + + it('should return an object with a search method', () => { + const strategy = pplSearchStrategyProvider(config$, logger, client, usage); + expect(strategy).toHaveProperty('search'); + expect(typeof strategy.search).toBe('function'); + }); + + it('should handle successful search response', async () => { + const mockResponse = { + success: true, + data: { + schema: [ + { name: 'field1', type: 'long' }, + { name: 'field2', type: 'text' }, + ], + datarows: [ + [1, 'value1'], + [2, 'value2'], + ], + }, + took: 100, + }; + const mockFacet = ({ + describeQuery: jest.fn().mockResolvedValue(mockResponse), + } as unknown) as facet.Facet; + jest.spyOn(facet, 'Facet').mockImplementation(() => mockFacet); + (utils.getFields as jest.Mock).mockReturnValue([ + { name: 'field1', type: 'long' }, + { name: 'field2', type: 'text' }, + ]); + + const strategy = pplSearchStrategyProvider(config$, logger, client, usage); + const result = await strategy.search( + emptyRequestHandlerContext, + ({ + body: { query: { query: 'source = table', dataset: { id: 'test-dataset' } } }, + } as unknown) as IOpenSearchDashboardsSearchRequest, + {} + ); + + expect(result).toEqual({ + type: DATA_FRAME_TYPES.DEFAULT, + body: { + name: 'test-dataset', + fields: [ + { name: 'field1', type: 'long', values: [] }, + { name: 'field2', type: 'text', values: [] }, + ], + schema: [ + { name: 'field1', type: 'long', values: [] }, + { name: 'field2', type: 'text', values: [] }, + ], + size: 2, + }, + took: 100, + }); + expect(usage.trackSuccess).toHaveBeenCalledWith(100); + }); + + it('should handle failed search response', async () => { + const mockResponse = { + success: false, + data: { cause: 'Query failed' }, + took: 50, + }; + const mockFacet = ({ + describeQuery: jest.fn().mockResolvedValue(mockResponse), + } as unknown) as facet.Facet; + jest.spyOn(facet, 'Facet').mockImplementation(() => mockFacet); + + const strategy = pplSearchStrategyProvider(config$, logger, client, usage); + await expect( + strategy.search( + emptyRequestHandlerContext, + ({ + body: { query: { query: 'source = table' } }, + } as unknown) as IOpenSearchDashboardsSearchRequest, + {} + ) + ).rejects.toThrow(); + }); + + it('should handle exceptions', async () => { + const mockError = new Error('Something went wrong'); + const mockFacet = ({ + describeQuery: jest.fn().mockRejectedValue(mockError), + } as unknown) as facet.Facet; + jest.spyOn(facet, 'Facet').mockImplementation(() => mockFacet); + + const strategy = pplSearchStrategyProvider(config$, logger, client, usage); + await expect( + strategy.search( + emptyRequestHandlerContext, + ({ + body: { query: { query: 'source = table' } }, + } as unknown) as IOpenSearchDashboardsSearchRequest, + {} + ) + ).rejects.toThrow(mockError); + expect(logger.error).toHaveBeenCalledWith(`pplSearchStrategy: ${mockError.message}`); + expect(usage.trackError).toHaveBeenCalled(); + }); + + it('should throw error when describeQuery success is false', async () => { + const mockError = new Error('Something went wrong'); + const mockFacet = ({ + describeQuery: jest.fn().mockResolvedValue({ success: false, data: mockError }), + } as unknown) as facet.Facet; + jest.spyOn(facet, 'Facet').mockImplementation(() => mockFacet); + + const strategy = pplSearchStrategyProvider(config$, logger, client, usage); + await expect( + strategy.search( + emptyRequestHandlerContext, + ({ + body: { query: { query: 'source = table' } }, + } as unknown) as IOpenSearchDashboardsSearchRequest, + {} + ) + ).rejects.toThrowError(); + expect(logger.error).toHaveBeenCalledWith(expect.stringContaining(mockError.message)); + expect(usage.trackError).toHaveBeenCalled(); + }); + + it('should handle empty search response', async () => { + const mockResponse = { + success: true, + data: { + schema: [ + { name: 'field1', type: 'long' }, + { name: 'field2', type: 'text' }, + ], + datarows: [], + }, + took: 10, + }; + const mockFacet = ({ + describeQuery: jest.fn().mockResolvedValue(mockResponse), + } as unknown) as facet.Facet; + jest.spyOn(facet, 'Facet').mockImplementation(() => mockFacet); + (utils.getFields as jest.Mock).mockReturnValue([ + { name: 'field1', type: 'long' }, + { name: 'field2', type: 'text' }, + ]); + + const strategy = pplSearchStrategyProvider(config$, logger, client, usage); + const result = await strategy.search( + emptyRequestHandlerContext, + ({ + body: { query: { query: 'source = empty_table', dataset: { id: 'empty-dataset' } } }, + } as unknown) as IOpenSearchDashboardsSearchRequest, + {} + ); + + expect(result).toEqual({ + type: DATA_FRAME_TYPES.DEFAULT, + body: { + name: 'empty-dataset', + fields: [ + { name: 'field1', type: 'long', values: [] }, + { name: 'field2', type: 'text', values: [] }, + ], + schema: [ + { name: 'field1', type: 'long', values: [] }, + { name: 'field2', type: 'text', values: [] }, + ], + size: 0, + }, + took: 10, + }); + expect(usage.trackSuccess).toHaveBeenCalledWith(10); + }); + + it('should handle aggConfig when response succeeds', async () => { + const mockResponse = { + success: true, + data: { + schema: [ + { name: 'field1', type: 'long' }, + { name: 'field2', type: 'text' }, + ], + datarows: [ + [1, 'value1'], + [2, 'value2'], + ], + }, + took: 10, + }; + const mockFacet = ({ + describeQuery: jest.fn().mockResolvedValue(mockResponse), + } as unknown) as facet.Facet; + jest.spyOn(facet, 'Facet').mockImplementation(() => mockFacet); + (utils.getFields as jest.Mock).mockReturnValue([ + { name: 'field1', type: 'long' }, + { name: 'field2', type: 'text' }, + ]); + + const strategy = pplSearchStrategyProvider(config$, logger, client, usage); + const result = await strategy.search( + emptyRequestHandlerContext, + ({ + body: { + query: { query: 'source = empty_table', dataset: { id: 'empty-dataset' } }, + aggConfig: { + date_histogram: { + field: 'timestamp', + fixed_interval: '12h', + time_zone: 'America/Los_Angeles', + min_doc_count: 1, + }, + qs: { + '2': 'source = empty_table | stats count() by span(timestamp, 12h)', + }, + }, + }, + } as unknown) as IOpenSearchDashboardsSearchRequest, + {} + ); + + expect(result).toEqual({ + type: DATA_FRAME_TYPES.DEFAULT, + body: { + name: 'empty-dataset', + fields: [ + { name: 'field1', type: 'long', values: [] }, + { name: 'field2', type: 'text', values: [] }, + ], + schema: [ + { name: 'field1', type: 'long', values: [] }, + { name: 'field2', type: 'text', values: [] }, + ], + aggs: { + '2': [ + { key: 'value1', value: 1 }, + { key: 'value2', value: 2 }, + ], + }, + meta: { + date_histogram: { + field: 'timestamp', + fixed_interval: '12h', + time_zone: 'America/Los_Angeles', + min_doc_count: 1, + }, + qs: { '2': 'source = empty_table | stats count() by span(timestamp, 12h)' }, + }, + size: 2, + }, + took: 10, + }); + expect(usage.trackSuccess).toHaveBeenCalledWith(10); + }); + + it('should handle aggConfig when aggregation fails', async () => { + const mockResponse = { + success: true, + data: { + schema: [ + { name: 'field1', type: 'long' }, + { name: 'field2', type: 'text' }, + ], + datarows: [ + [1, 'value1'], + [2, 'value2'], + ], + }, + took: 10, + }; + const mockError = new Error('Something went wrong'); + const mockFacet = ({ + describeQuery: jest + .fn() + .mockResolvedValueOnce(mockResponse) + .mockResolvedValue({ success: false, data: mockError }), + } as unknown) as facet.Facet; + jest.spyOn(facet, 'Facet').mockImplementation(() => mockFacet); + (utils.getFields as jest.Mock).mockReturnValue([ + { name: 'field1', type: 'long' }, + { name: 'field2', type: 'text' }, + ]); + + const strategy = pplSearchStrategyProvider(config$, logger, client, usage); + const result = await strategy.search( + emptyRequestHandlerContext, + ({ + body: { + query: { query: 'source = empty_table', dataset: { id: 'empty-dataset' } }, + aggConfig: { + date_histogram: { + field: 'timestamp', + fixed_interval: '12h', + time_zone: 'America/Los_Angeles', + min_doc_count: 1, + }, + qs: { + '2': 'source = empty_table | stats count() by span(timestamp, 12h)', + }, + }, + }, + } as unknown) as IOpenSearchDashboardsSearchRequest, + {} + ); + + expect(result).toEqual({ + type: DATA_FRAME_TYPES.DEFAULT, + body: { + name: 'empty-dataset', + fields: [ + { name: 'field1', type: 'long', values: [] }, + { name: 'field2', type: 'text', values: [] }, + ], + schema: [ + { name: 'field1', type: 'long', values: [] }, + { name: 'field2', type: 'text', values: [] }, + ], + meta: { + date_histogram: { + field: 'timestamp', + fixed_interval: '12h', + time_zone: 'America/Los_Angeles', + min_doc_count: 1, + }, + qs: { '2': 'source = empty_table | stats count() by span(timestamp, 12h)' }, + }, + size: 2, + }, + took: 10, + }); + expect(usage.trackSuccess).toHaveBeenCalledWith(10); + }); +}); diff --git a/src/plugins/query_enhancements/server/search/ppl_search_strategy.ts b/src/plugins/query_enhancements/server/search/ppl_search_strategy.ts index d71ae6810fad..d47d2ca41c4a 100644 --- a/src/plugins/query_enhancements/server/search/ppl_search_strategy.ts +++ b/src/plugins/query_enhancements/server/search/ppl_search_strategy.ts @@ -14,7 +14,7 @@ import { Query, createDataFrame, } from '../../../data/common'; -import { getFields, handleFacetError } from '../../common/utils'; +import { getFields, throwFacetError } from '../../common/utils'; import { Facet } from '../utils'; import { QueryAggConfig } from '../../common'; @@ -39,7 +39,7 @@ export const pplSearchStrategyProvider = ( const aggConfig: QueryAggConfig | undefined = request.body.aggConfig; const rawResponse: any = await pplFacet.describeQuery(context, request); - if (!rawResponse.success) handleFacetError(rawResponse); + if (!rawResponse.success) throwFacetError(rawResponse); const dataFrame = createDataFrame({ name: query.dataset?.id, @@ -56,7 +56,7 @@ export const pplSearchStrategyProvider = ( for (const [key, aggQueryString] of Object.entries(aggConfig.qs)) { request.body.query.query = aggQueryString; const rawAggs: any = await pplFacet.describeQuery(context, request); - if (!rawAggs.success) handleFacetError(rawResponse); + if (!rawAggs.success) continue; (dataFrame as IDataFrameWithAggs).aggs = {}; (dataFrame as IDataFrameWithAggs).aggs[key] = rawAggs.data.datarows?.map((hit: any) => { return { diff --git a/src/plugins/query_enhancements/server/search/sql_async_search_strategy.ts b/src/plugins/query_enhancements/server/search/sql_async_search_strategy.ts index bc25f69a70f6..76642b9dbac5 100644 --- a/src/plugins/query_enhancements/server/search/sql_async_search_strategy.ts +++ b/src/plugins/query_enhancements/server/search/sql_async_search_strategy.ts @@ -13,7 +13,7 @@ import { Query, } from '../../../data/common'; import { ISearchStrategy, SearchUsage } from '../../../data/server'; -import { buildQueryStatusConfig, getFields, handleFacetError, SEARCH_STRATEGY } from '../../common'; +import { buildQueryStatusConfig, getFields, throwFacetError, SEARCH_STRATEGY } from '../../common'; import { Facet } from '../utils'; export const sqlAsyncSearchStrategyProvider = ( @@ -45,7 +45,7 @@ export const sqlAsyncSearchStrategyProvider = ( request.body = { ...request.body, lang: SEARCH_STRATEGY.SQL }; const rawResponse: any = await sqlAsyncFacet.describeQuery(context, request); - if (!rawResponse.success) handleFacetError(rawResponse); + if (!rawResponse.success) throwFacetError(rawResponse); const statusConfig = buildQueryStatusConfig(rawResponse); @@ -60,7 +60,7 @@ export const sqlAsyncSearchStrategyProvider = ( request.params = { queryId: inProgressQueryId }; const queryStatusResponse = await sqlAsyncJobsFacet.describeQuery(context, request); - if (!queryStatusResponse.success) handleFacetError(queryStatusResponse); + if (!queryStatusResponse.success) throwFacetError(queryStatusResponse); const queryStatus = queryStatusResponse.data?.status; logger.info(`sqlAsyncSearchStrategy: JOB: ${inProgressQueryId} - STATUS: ${queryStatus}`); diff --git a/src/plugins/query_enhancements/server/search/sql_search_strategy.ts b/src/plugins/query_enhancements/server/search/sql_search_strategy.ts index 8fa945c8809e..09f2775d0fe2 100644 --- a/src/plugins/query_enhancements/server/search/sql_search_strategy.ts +++ b/src/plugins/query_enhancements/server/search/sql_search_strategy.ts @@ -13,7 +13,7 @@ import { Query, createDataFrame, } from '../../../data/common'; -import { getFields, handleFacetError } from '../../common/utils'; +import { getFields, throwFacetError } from '../../common/utils'; import { Facet } from '../utils'; export const sqlSearchStrategyProvider = ( @@ -36,7 +36,7 @@ export const sqlSearchStrategyProvider = ( const query: Query = request.body.query; const rawResponse: any = await sqlFacet.describeQuery(context, request); - if (!rawResponse.success) handleFacetError(rawResponse); + if (!rawResponse.success) throwFacetError(rawResponse); const dataFrame = createDataFrame({ name: query.dataset?.id,