зеркало из https://github.com/github/vitess-gh.git
Merge pull request #8331 from tinyspeck/sarabee-vtadmin-stream-lag-charts
[vtadmin-web] Add chart for stream vreplication lag across all streams in a workflow
This commit is contained in:
Коммит
12900724f6
|
@ -18,6 +18,7 @@ import { vtadmin as pb } from '../proto/vtadmin';
|
|||
import * as errorHandler from '../errors/errorHandler';
|
||||
import { HttpFetchError, HttpResponseNotOkError, MalformedHttpResponseError } from '../errors/errorTypes';
|
||||
import { HttpOkResponse } from './responseTypes';
|
||||
import { TabletDebugVars } from '../util/tabletDebugVars';
|
||||
|
||||
/**
|
||||
* vtfetch makes HTTP requests against the given vtadmin-api endpoint
|
||||
|
@ -188,13 +189,22 @@ export const fetchTablet = async ({ clusterID, alias }: FetchTabletParams) => {
|
|||
return pb.Tablet.create(result);
|
||||
};
|
||||
|
||||
export const fetchExperimentalTabletDebugVars = async ({ clusterID, alias }: FetchTabletParams) => {
|
||||
export interface TabletDebugVarsResponse {
|
||||
params: FetchTabletParams;
|
||||
data?: TabletDebugVars;
|
||||
}
|
||||
|
||||
export const fetchExperimentalTabletDebugVars = async (params: FetchTabletParams): Promise<TabletDebugVarsResponse> => {
|
||||
if (!process.env.REACT_APP_ENABLE_EXPERIMENTAL_TABLET_DEBUG_VARS) {
|
||||
return Promise.resolve({});
|
||||
return Promise.resolve({ params });
|
||||
}
|
||||
|
||||
const { clusterID, alias } = params;
|
||||
const { result } = await vtfetch(`/api/experimental/tablet/${alias}/debug/vars?cluster=${clusterID}`);
|
||||
return result;
|
||||
|
||||
// /debug/vars doesn't contain cluster/tablet information, so we
|
||||
// return that as part of the response.
|
||||
return { params, data: result };
|
||||
};
|
||||
|
||||
export const fetchTablets = async () =>
|
||||
|
|
|
@ -37,7 +37,7 @@ export const TabletQPSChart = ({ alias, clusterID }: Props) => {
|
|||
);
|
||||
|
||||
const options = useMemo(() => {
|
||||
const tsdata = getQPSTimeseries(debugVars, query.dataUpdatedAt);
|
||||
const tsdata = getQPSTimeseries(debugVars?.data, query.dataUpdatedAt);
|
||||
|
||||
const series: Highcharts.SeriesOptionsType[] = Object.entries(tsdata).map(([name, data]) => ({
|
||||
data,
|
||||
|
|
|
@ -37,7 +37,7 @@ export const TabletVReplicationQPSChart = ({ alias, clusterID }: Props) => {
|
|||
);
|
||||
|
||||
const options = useMemo(() => {
|
||||
const tsdata = getVReplicationQPSTimeseries(debugVars, query.dataUpdatedAt);
|
||||
const tsdata = getVReplicationQPSTimeseries(debugVars?.data, query.dataUpdatedAt);
|
||||
|
||||
const series: Highcharts.SeriesOptionsType[] = Object.entries(tsdata).map(([name, data]) => ({
|
||||
data,
|
||||
|
|
|
@ -0,0 +1,97 @@
|
|||
/**
|
||||
* Copyright 2021 The Vitess Authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import { UseQueryResult } from 'react-query';
|
||||
import { TabletDebugVarsResponse } from '../../api/http';
|
||||
import { vtadmin as pb } from '../../proto/vtadmin';
|
||||
import { formatSeries } from './WorkflowStreamsLagChart';
|
||||
|
||||
describe('WorkflowStreamsLagChart', () => {
|
||||
describe('formatSeries', () => {
|
||||
it('should return series for all streams in the workflow', () => {
|
||||
const workflow = pb.Workflow.create({
|
||||
cluster: {
|
||||
id: 'zone1',
|
||||
name: 'zone1',
|
||||
},
|
||||
workflow: {
|
||||
shard_streams: {
|
||||
'-80/us_east_1a-123456': {
|
||||
streams: [
|
||||
{ id: 1, shard: '-80', tablet: { cell: 'us_east_1a', uid: 123456 } },
|
||||
{ id: 2, shard: '-80', tablet: { cell: 'us_east_1a', uid: 123456 } },
|
||||
],
|
||||
},
|
||||
'80-/us_east_1a-789012': {
|
||||
streams: [{ id: 1, shard: '80-', tablet: { cell: 'us_east_1a', uid: 789012 } }],
|
||||
},
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
const queries: Partial<UseQueryResult<TabletDebugVarsResponse, Error>>[] = [
|
||||
{
|
||||
data: {
|
||||
params: { alias: 'us_east_1a-123456', clusterID: 'zone1' },
|
||||
data: {
|
||||
VReplicationLag: {
|
||||
All: [3, 3, 3],
|
||||
'1': [1, 1, 1],
|
||||
'2': [2, 2, 2],
|
||||
},
|
||||
},
|
||||
},
|
||||
dataUpdatedAt: 1000000000000,
|
||||
},
|
||||
{
|
||||
data: {
|
||||
params: { alias: 'us_east_1a-789012', clusterID: 'zone1' },
|
||||
data: {
|
||||
VReplicationLag: {
|
||||
All: [],
|
||||
'1': [1, 1, 1],
|
||||
// Some other stream running on the tablet that isn't part
|
||||
// of this workflow.
|
||||
'2': [2, 2, 2],
|
||||
},
|
||||
},
|
||||
},
|
||||
dataUpdatedAt: 1000000000000,
|
||||
},
|
||||
];
|
||||
|
||||
// A sneaky cast to UseQueryResult since otherwise enumerating the many fields
|
||||
// UseQueryResult (most of which we don't use) is pointlessly verbose.
|
||||
const result = formatSeries(workflow, queries as UseQueryResult<TabletDebugVarsResponse, Error>[]);
|
||||
|
||||
// Use snapshot matching since defining expected values for arrays of 180 data points is... annoying.
|
||||
expect(result).toMatchSnapshot();
|
||||
|
||||
// ...but! Add additional validation so that failing tests are easier to debug.
|
||||
// (And because it can be tempting to not examine snapshot changes in detail...) :)
|
||||
expect(result.length).toEqual(3);
|
||||
|
||||
expect(result[0].name).toEqual('us_east_1a-123456/1');
|
||||
expect(result[1].name).toEqual('us_east_1a-123456/2');
|
||||
expect(result[2].name).toEqual('us_east_1a-789012/1');
|
||||
});
|
||||
|
||||
it('should handle empty input', () => {
|
||||
const result = formatSeries(null, []);
|
||||
expect(result).toEqual([]);
|
||||
});
|
||||
});
|
||||
});
|
|
@ -0,0 +1,125 @@
|
|||
/**
|
||||
* Copyright 2021 The Vitess Authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import { useMemo } from 'react';
|
||||
|
||||
import { useManyExperimentalTabletDebugVars, useWorkflow } from '../../hooks/api';
|
||||
import { vtadmin } from '../../proto/vtadmin';
|
||||
import { getStreamVReplicationLagTimeseries, QPS_REFETCH_INTERVAL } from '../../util/tabletDebugVars';
|
||||
import { formatStreamKey, getStreams, getStreamTablets } from '../../util/workflows';
|
||||
import { Timeseries } from './Timeseries';
|
||||
|
||||
interface Props {
|
||||
clusterID: string;
|
||||
keyspace: string;
|
||||
workflowName: string;
|
||||
}
|
||||
|
||||
// Default min/max values (in seconds) for the y-axis when there is no data to show.
|
||||
const DEFAULT_Y_MAX = 5;
|
||||
const DEFAULT_Y_MIN = 0;
|
||||
|
||||
export const WorkflowStreamsLagChart = ({ clusterID, keyspace, workflowName }: Props) => {
|
||||
const { data: workflow, ...wq } = useWorkflow({ clusterID, keyspace, name: workflowName });
|
||||
|
||||
const queryParams = useMemo(() => {
|
||||
const aliases = getStreamTablets(workflow);
|
||||
return aliases.map((alias) => ({ alias, clusterID }));
|
||||
}, [clusterID, workflow]);
|
||||
|
||||
const tabletQueries = useManyExperimentalTabletDebugVars(queryParams, {
|
||||
enabled: !!workflow,
|
||||
refetchInterval: QPS_REFETCH_INTERVAL,
|
||||
refetchIntervalInBackground: true,
|
||||
});
|
||||
|
||||
const anyLoading = wq.isLoading || tabletQueries.some((q) => q.isLoading);
|
||||
|
||||
const chartOptions: Highcharts.Options = useMemo(() => {
|
||||
const series = formatSeries(workflow, tabletQueries);
|
||||
const allSeriesEmpty = series.every((s) => !s.data?.length);
|
||||
|
||||
return {
|
||||
series,
|
||||
yAxis: {
|
||||
labels: {
|
||||
format: '{text} s',
|
||||
},
|
||||
// The desired behaviour is to show axes + grid lines
|
||||
// even when there is no data to show. Unfortunately, setting
|
||||
// softMin/softMax (which is more flexible) doesn't work with showEmpty.
|
||||
// Instead, we must set explicit min/max, but only when all series are empty.
|
||||
// If at least one series has data, allow min/max to be automatically calculated.
|
||||
max: allSeriesEmpty ? DEFAULT_Y_MAX : null,
|
||||
min: allSeriesEmpty ? DEFAULT_Y_MIN : null,
|
||||
},
|
||||
};
|
||||
}, [tabletQueries, workflow]);
|
||||
|
||||
return <Timeseries isLoading={anyLoading} options={chartOptions} />;
|
||||
};
|
||||
|
||||
// Internal function, exported only for testing.
|
||||
export const formatSeries = (
|
||||
workflow: vtadmin.Workflow | null | undefined,
|
||||
tabletQueries: ReturnType<typeof useManyExperimentalTabletDebugVars>
|
||||
): Highcharts.SeriesLineOptions[] => {
|
||||
if (!workflow) {
|
||||
return [];
|
||||
}
|
||||
|
||||
// Get streamKeys for streams in this workflow.
|
||||
const streamKeys = getStreams(workflow).map((s) => formatStreamKey(s));
|
||||
|
||||
// Initialize the timeseries from the workflow, so that every stream in the workflow
|
||||
// is shown in the legend, even if the /debug/vars data isn't (yet) available.
|
||||
const seriesByStreamKey: { [streamKey: string]: Highcharts.SeriesLineOptions } = {};
|
||||
|
||||
streamKeys.forEach((streamKey) => {
|
||||
if (streamKey) {
|
||||
seriesByStreamKey[streamKey] = { data: [], name: streamKey, type: 'line' };
|
||||
}
|
||||
});
|
||||
|
||||
tabletQueries.forEach((tq) => {
|
||||
if (!tq.data) {
|
||||
return;
|
||||
}
|
||||
|
||||
const tabletAlias = tq.data.params.alias;
|
||||
|
||||
const lagData = getStreamVReplicationLagTimeseries(tq.data.data, tq.dataUpdatedAt);
|
||||
Object.entries(lagData).forEach(([streamID, streamLagData]) => {
|
||||
// Don't graph aggregate vreplication lag for the tablet, since that
|
||||
// can include vreplication lag data for streams running on the tablet
|
||||
// that are not in the current workflow.
|
||||
if (streamID === 'All') {
|
||||
return;
|
||||
}
|
||||
|
||||
const streamKey = `${tabletAlias}/${streamID}`;
|
||||
|
||||
// Don't graph series for streams that aren't in this workflow.
|
||||
if (!(streamKey in seriesByStreamKey)) {
|
||||
return;
|
||||
}
|
||||
|
||||
seriesByStreamKey[streamKey].data = streamLagData;
|
||||
});
|
||||
});
|
||||
|
||||
return Object.values(seriesByStreamKey);
|
||||
};
|
Разница между файлами не показана из-за своего большого размера
Загрузить разницу
|
@ -29,6 +29,7 @@ import { DataTable } from '../../dataTable/DataTable';
|
|||
import { KeyspaceLink } from '../../links/KeyspaceLink';
|
||||
import { TabletLink } from '../../links/TabletLink';
|
||||
import { StreamStatePip } from '../../pips/StreamStatePip';
|
||||
import { WorkflowStreamsLagChart } from '../../charts/WorkflowStreamsLagChart';
|
||||
|
||||
interface Props {
|
||||
clusterID: string;
|
||||
|
@ -39,7 +40,7 @@ interface Props {
|
|||
const COLUMNS = ['Stream', 'Source', 'Target', 'Tablet'];
|
||||
|
||||
export const WorkflowStreams = ({ clusterID, keyspace, name }: Props) => {
|
||||
const { data } = useWorkflow({ clusterID, keyspace, name }, { refetchInterval: 1000 });
|
||||
const { data } = useWorkflow({ clusterID, keyspace, name });
|
||||
|
||||
const streams = useMemo(() => {
|
||||
const rows = getStreams(data).map((stream) => ({
|
||||
|
@ -107,6 +108,14 @@ export const WorkflowStreams = ({ clusterID, keyspace, name }: Props) => {
|
|||
|
||||
return (
|
||||
<div>
|
||||
{process.env.REACT_APP_ENABLE_EXPERIMENTAL_TABLET_DEBUG_VARS && (
|
||||
<>
|
||||
<h3>Stream VReplication Lag</h3>
|
||||
<WorkflowStreamsLagChart clusterID={clusterID} keyspace={keyspace} workflowName={name} />
|
||||
</>
|
||||
)}
|
||||
|
||||
<h3>Streams</h3>
|
||||
{/* TODO(doeg): add a protobuf enum for this (https://github.com/vitessio/vitess/projects/12#card-60190340) */}
|
||||
{['Error', 'Copying', 'Running', 'Stopped'].map((streamState) => {
|
||||
if (!Array.isArray(streamsByState[streamState])) {
|
||||
|
|
|
@ -13,7 +13,7 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
import { useQuery, useQueryClient, UseQueryOptions } from 'react-query';
|
||||
import { useQueries, useQuery, useQueryClient, UseQueryOptions, UseQueryResult } from 'react-query';
|
||||
import {
|
||||
fetchClusters,
|
||||
fetchExperimentalTabletDebugVars,
|
||||
|
@ -23,15 +23,16 @@ import {
|
|||
FetchSchemaParams,
|
||||
fetchSchemas,
|
||||
fetchTablet,
|
||||
FetchTabletParams,
|
||||
fetchTablets,
|
||||
fetchVSchema,
|
||||
FetchVSchemaParams,
|
||||
fetchVTExplain,
|
||||
fetchWorkflow,
|
||||
fetchWorkflows,
|
||||
TabletDebugVarsResponse,
|
||||
} from '../api/http';
|
||||
import { vtadmin as pb } from '../proto/vtadmin';
|
||||
import { TabletDebugVars } from '../util/tabletDebugVars';
|
||||
import { formatAlias } from '../util/tablets';
|
||||
|
||||
/**
|
||||
|
@ -81,8 +82,8 @@ export const useTablet = (params: Parameters<typeof fetchTablet>[0], options?: U
|
|||
};
|
||||
|
||||
export const useExperimentalTabletDebugVars = (
|
||||
params: Parameters<typeof fetchExperimentalTabletDebugVars>[0],
|
||||
options?: UseQueryOptions<TabletDebugVars, Error>
|
||||
params: FetchTabletParams,
|
||||
options?: UseQueryOptions<TabletDebugVarsResponse, Error>
|
||||
) => {
|
||||
return useQuery(
|
||||
['experimental/tablet/debug/vars', params],
|
||||
|
@ -91,6 +92,22 @@ export const useExperimentalTabletDebugVars = (
|
|||
);
|
||||
};
|
||||
|
||||
// Future enhancement: add vtadmin-api endpoint to fetch /debug/vars
|
||||
// for multiple tablets in a single request. https://github.com/vitessio/vitess/projects/12#card-63086674
|
||||
export const useManyExperimentalTabletDebugVars = (
|
||||
params: FetchTabletParams[],
|
||||
defaultOptions: UseQueryOptions<TabletDebugVarsResponse, Error> = {}
|
||||
) => {
|
||||
// Robust typing for useQueries is still in progress, so we do
|
||||
// some sneaky type-casting. See https://github.com/tannerlinsley/react-query/issues/1675
|
||||
const queries = params.map((p) => ({
|
||||
queryKey: ['experimental/tablet/debug/vars', p],
|
||||
queryFn: () => fetchExperimentalTabletDebugVars(p),
|
||||
...(defaultOptions as any),
|
||||
}));
|
||||
return useQueries(queries) as UseQueryResult<TabletDebugVarsResponse, Error>[];
|
||||
};
|
||||
|
||||
/**
|
||||
* useWorkflowsResponse is a query hook that fetches all workflows (by cluster) across every cluster.
|
||||
*/
|
||||
|
|
|
@ -37,6 +37,7 @@ export type TabletDebugVars = Partial<{
|
|||
QPS: { [k: string]: number[] };
|
||||
|
||||
// See https://github.com/vitessio/vitess/blob/main/go/vt/vttablet/tabletmanager/vreplication/stats.go
|
||||
VReplicationLag: { [k: string]: number[] };
|
||||
VReplicationQPS: { [k: string]: number[] };
|
||||
}>;
|
||||
|
||||
|
@ -50,6 +51,11 @@ export type TimeseriesMap = { [seriesName: string]: TimeseriesPoint[] };
|
|||
export const getQPSTimeseries = (d: TabletDebugVars | null | undefined, endAt?: number): TimeseriesMap =>
|
||||
formatTimeseriesMap(d?.QPS || {}, endAt);
|
||||
|
||||
export const getStreamVReplicationLagTimeseries = (
|
||||
d: TabletDebugVars | null | undefined,
|
||||
endAt?: number
|
||||
): TimeseriesMap => formatTimeseriesMap(d?.VReplicationLag || {}, endAt);
|
||||
|
||||
export const getVReplicationQPSTimeseries = (d: TabletDebugVars | null | undefined, endAt?: number): TimeseriesMap =>
|
||||
formatTimeseriesMap(d?.VReplicationQPS || {}, endAt);
|
||||
|
||||
|
|
|
@ -14,7 +14,7 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
import { vtadmin as pb } from '../proto/vtadmin';
|
||||
import { getStreams } from './workflows';
|
||||
import { getStreams, getStreamTablets } from './workflows';
|
||||
|
||||
describe('getStreams', () => {
|
||||
const tests: {
|
||||
|
@ -70,3 +70,50 @@ describe('getStreams', () => {
|
|||
}
|
||||
);
|
||||
});
|
||||
|
||||
describe('getStreamTablets', () => {
|
||||
const tests: {
|
||||
name: string;
|
||||
input: Parameters<typeof getStreamTablets>;
|
||||
expected: ReturnType<typeof getStreamTablets>;
|
||||
}[] = [
|
||||
{
|
||||
name: 'should return a set of unique tablet aliases',
|
||||
input: [
|
||||
pb.Workflow.create({
|
||||
workflow: {
|
||||
shard_streams: {
|
||||
'-80/us_east_1a-123456': {
|
||||
streams: [
|
||||
{ id: 1, shard: '-80', tablet: { cell: 'us_east_1a', uid: 123456 } },
|
||||
{ id: 2, shard: '-80', tablet: { cell: 'us_east_1a', uid: 123456 } },
|
||||
],
|
||||
},
|
||||
'80-/us_east_1a-789012': {
|
||||
streams: [{ id: 1, shard: '80-', tablet: { cell: 'us_east_1a', uid: 789012 } }],
|
||||
},
|
||||
},
|
||||
},
|
||||
}),
|
||||
],
|
||||
expected: ['us_east_1a-123456', 'us_east_1a-789012'],
|
||||
},
|
||||
{
|
||||
name: 'should handle empty workflow',
|
||||
input: [pb.Workflow.create()],
|
||||
expected: [],
|
||||
},
|
||||
{
|
||||
name: 'should handle null input',
|
||||
input: [null],
|
||||
expected: [],
|
||||
},
|
||||
];
|
||||
|
||||
test.each(tests.map(Object.values))(
|
||||
'%s',
|
||||
(name: string, input: Parameters<typeof getStreamTablets>, expected: ReturnType<typeof getStreamTablets>) => {
|
||||
expect(getStreamTablets(...input)).toEqual(expected);
|
||||
}
|
||||
);
|
||||
});
|
||||
|
|
|
@ -59,3 +59,24 @@ export const getTimeUpdated = <W extends pb.IWorkflow>(workflow: W | null | unde
|
|||
const timestamps = getStreams(workflow).map((s) => parseInt(`${s.time_updated?.seconds}`, 10));
|
||||
return Math.max(...timestamps);
|
||||
};
|
||||
|
||||
/**
|
||||
* getStreamTablets returns an unordered set of tablet alias strings across all streams
|
||||
* in the workflow.
|
||||
*/
|
||||
export const getStreamTablets = <W extends pb.IWorkflow>(workflow: W | null | undefined): string[] => {
|
||||
const streams = getStreams(workflow);
|
||||
if (!Array.isArray(streams)) {
|
||||
return [];
|
||||
}
|
||||
|
||||
const aliases = new Set<string>();
|
||||
streams.forEach((stream) => {
|
||||
const alias = formatAlias(stream.tablet);
|
||||
if (alias) {
|
||||
aliases.add(alias);
|
||||
}
|
||||
});
|
||||
|
||||
return [...aliases];
|
||||
};
|
||||
|
|
Загрузка…
Ссылка в новой задаче