chore: extract pipe->connection code (#34689)

This commit is contained in:
Pavel Feldman 2025-02-10 15:04:33 -08:00 committed by GitHub
parent 2718ce7cbf
commit 51f944d16a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 43 additions and 64 deletions

View File

@ -18,7 +18,6 @@ import { EventEmitter } from 'events';
import { BrowserContext, prepareBrowserContextParams } from './browserContext'; import { BrowserContext, prepareBrowserContextParams } from './browserContext';
import { ChannelOwner } from './channelOwner'; import { ChannelOwner } from './channelOwner';
import { Connection } from './connection';
import { TargetClosedError, isTargetClosedError } from './errors'; import { TargetClosedError, isTargetClosedError } from './errors';
import { Events } from './events'; import { Events } from './events';
import { Waiter } from './waiter'; import { Waiter } from './waiter';
@ -72,45 +71,28 @@ export class Android extends ChannelOwner<channels.AndroidChannel> implements ap
const headers = { 'x-playwright-browser': 'android', ...options.headers }; const headers = { 'x-playwright-browser': 'android', ...options.headers };
const localUtils = this._connection.localUtils(); const localUtils = this._connection.localUtils();
const connectParams: channels.LocalUtilsConnectParams = { wsEndpoint, headers, slowMo: options.slowMo, timeout: options.timeout }; const connectParams: channels.LocalUtilsConnectParams = { wsEndpoint, headers, slowMo: options.slowMo, timeout: options.timeout };
const { pipe } = await localUtils.connect(connectParams); const connection = await localUtils.connect(connectParams);
const closePipe = () => pipe.close().catch(() => {});
const connection = new Connection(localUtils, this._platform, this._instrumentation);
connection.markAsRemote();
connection.on('close', closePipe);
let device: AndroidDevice; let device: AndroidDevice;
let closeError: string | undefined; connection.on('close', () => {
const onPipeClosed = () => {
device?._didClose(); device?._didClose();
connection.close(closeError);
};
pipe.on('closed', onPipeClosed);
connection.onmessage = message => pipe.send({ message }).catch(onPipeClosed);
pipe.on('message', ({ message }) => {
try {
connection!.dispatch(message);
} catch (e) {
closeError = String(e);
closePipe();
}
}); });
const result = await raceAgainstDeadline(async () => { const result = await raceAgainstDeadline(async () => {
const playwright = await connection!.initializePlaywright(); const playwright = await connection!.initializePlaywright();
if (!playwright._initializer.preConnectedAndroidDevice) { if (!playwright._initializer.preConnectedAndroidDevice) {
closePipe(); connection.close();
throw new Error('Malformed endpoint. Did you use Android.launchServer method?'); throw new Error('Malformed endpoint. Did you use Android.launchServer method?');
} }
device = AndroidDevice.from(playwright._initializer.preConnectedAndroidDevice!); device = AndroidDevice.from(playwright._initializer.preConnectedAndroidDevice!);
device._shouldCloseConnectionOnClose = true; device._shouldCloseConnectionOnClose = true;
device.on(Events.AndroidDevice.Close, closePipe); device.on(Events.AndroidDevice.Close, () => connection.close());
return device; return device;
}, deadline); }, deadline);
if (!result.timedOut) { if (!result.timedOut) {
return result.result; return result.result;
} else { } else {
closePipe(); connection.close();
throw new Error(`Timeout ${options.timeout}ms exceeded`); throw new Error(`Timeout ${options.timeout}ms exceeded`);
} }
}); });

View File

@ -24,7 +24,7 @@ import { mkdirIfNeeded } from '../utils/fileUtils';
import type { BrowserType } from './browserType'; import type { BrowserType } from './browserType';
import type { Page } from './page'; import type { Page } from './page';
import type { BrowserContextOptions, HeadersArray, LaunchOptions } from './types'; import type { BrowserContextOptions, LaunchOptions } from './types';
import type * as api from '../../types/types'; import type * as api from '../../types/types';
import type * as channels from '@protocol/channels'; import type * as channels from '@protocol/channels';
@ -37,9 +37,6 @@ export class Browser extends ChannelOwner<channels.BrowserChannel> implements ap
_options: LaunchOptions = {}; _options: LaunchOptions = {};
readonly _name: string; readonly _name: string;
private _path: string | undefined; private _path: string | undefined;
// Used from @playwright/test fixtures.
_connectHeaders?: HeadersArray;
_closeReason: string | undefined; _closeReason: string | undefined;
static from(browser: channels.BrowserChannel): Browser { static from(browser: channels.BrowserChannel): Browser {

View File

@ -18,7 +18,6 @@ import { Browser } from './browser';
import { BrowserContext, prepareBrowserContextParams } from './browserContext'; import { BrowserContext, prepareBrowserContextParams } from './browserContext';
import { ChannelOwner } from './channelOwner'; import { ChannelOwner } from './channelOwner';
import { envObjectToArray } from './clientHelper'; import { envObjectToArray } from './clientHelper';
import { Connection } from './connection';
import { Events } from './events'; import { Events } from './events';
import { assert } from '../utils/debug'; import { assert } from '../utils/debug';
import { headersObjectToArray } from '../utils/headers'; import { headersObjectToArray } from '../utils/headers';
@ -133,40 +132,16 @@ export class BrowserType extends ChannelOwner<channels.BrowserTypeChannel> imple
}; };
if ((params as any).__testHookRedirectPortForwarding) if ((params as any).__testHookRedirectPortForwarding)
connectParams.socksProxyRedirectPortForTest = (params as any).__testHookRedirectPortForwarding; connectParams.socksProxyRedirectPortForTest = (params as any).__testHookRedirectPortForwarding;
const { pipe, headers: connectHeaders } = await localUtils.connect(connectParams); const connection = await localUtils.connect(connectParams);
const closePipe = () => pipe.close().catch(() => {});
const connection = new Connection(localUtils, this._platform, this._instrumentation);
connection.markAsRemote();
connection.on('close', closePipe);
let browser: Browser; let browser: Browser;
let closeError: string | undefined; connection.on('close', () => {
const onPipeClosed = (reason?: string) => {
// Emulate all pages, contexts and the browser closing upon disconnect. // Emulate all pages, contexts and the browser closing upon disconnect.
for (const context of browser?.contexts() || []) { for (const context of browser?.contexts() || []) {
for (const page of context.pages()) for (const page of context.pages())
page._onClose(); page._onClose();
context._onClose(); context._onClose();
} }
connection.close(reason || closeError);
// Give a chance to any API call promises to reject upon page/context closure.
// This happens naturally when we receive page.onClose and browser.onClose from the server
// in separate tasks. However, upon pipe closure we used to dispatch them all synchronously
// here and promises did not have a chance to reject.
// The order of rejects vs closure is a part of the API contract and our test runner
// relies on it to attribute rejections to the right test.
setTimeout(() => browser?._didClose(), 0); setTimeout(() => browser?._didClose(), 0);
};
pipe.on('closed', params => onPipeClosed(params.reason));
connection.onmessage = message => this._wrapApiCall(() => pipe.send({ message }).catch(() => onPipeClosed()), /* isInternal */ true);
pipe.on('message', ({ message }) => {
try {
connection!.dispatch(message);
} catch (e) {
closeError = String(e);
closePipe();
}
}); });
const result = await raceAgainstDeadline(async () => { const result = await raceAgainstDeadline(async () => {
@ -176,21 +151,20 @@ export class BrowserType extends ChannelOwner<channels.BrowserTypeChannel> imple
const playwright = await connection!.initializePlaywright(); const playwright = await connection!.initializePlaywright();
if (!playwright._initializer.preLaunchedBrowser) { if (!playwright._initializer.preLaunchedBrowser) {
closePipe(); connection.close();
throw new Error('Malformed endpoint. Did you use BrowserType.launchServer method?'); throw new Error('Malformed endpoint. Did you use BrowserType.launchServer method?');
} }
playwright._setSelectors(this._playwright.selectors); playwright._setSelectors(this._playwright.selectors);
browser = Browser.from(playwright._initializer.preLaunchedBrowser!); browser = Browser.from(playwright._initializer.preLaunchedBrowser!);
this._didLaunchBrowser(browser, {}, logger); this._didLaunchBrowser(browser, {}, logger);
browser._shouldCloseConnectionOnClose = true; browser._shouldCloseConnectionOnClose = true;
browser._connectHeaders = connectHeaders; browser.on(Events.Browser.Disconnected, () => connection.close());
browser.on(Events.Browser.Disconnected, () => this._wrapApiCall(() => closePipe(), /* isInternal */ true));
return browser; return browser;
}, deadline); }, deadline);
if (!result.timedOut) { if (!result.timedOut) {
return result.result; return result.result;
} else { } else {
closePipe(); connection.close();
throw new Error(`Timeout ${params.timeout}ms exceeded`); throw new Error(`Timeout ${params.timeout}ms exceeded`);
} }
}); });

View File

@ -47,6 +47,7 @@ import { formatCallLog, rewriteErrorMessage } from '../utils/stackTrace';
import { zones } from '../utils/zones'; import { zones } from '../utils/zones';
import type { ClientInstrumentation } from './clientInstrumentation'; import type { ClientInstrumentation } from './clientInstrumentation';
import type { HeadersArray } from './types';
import type { ValidatorContext } from '../protocol/validator'; import type { ValidatorContext } from '../protocol/validator';
import type { Platform } from '../utils/platform'; import type { Platform } from '../utils/platform';
import type * as channels from '@protocol/channels'; import type * as channels from '@protocol/channels';
@ -81,13 +82,16 @@ export class Connection extends EventEmitter {
private _tracingCount = 0; private _tracingCount = 0;
readonly _instrumentation: ClientInstrumentation; readonly _instrumentation: ClientInstrumentation;
readonly platform: Platform; readonly platform: Platform;
// Used from @playwright/test fixtures -> TODO remove?
readonly headers: HeadersArray;
constructor(localUtils: LocalUtils | undefined, platform: Platform, instrumentation: ClientInstrumentation | undefined) { constructor(localUtils: LocalUtils | undefined, platform: Platform, instrumentation: ClientInstrumentation | undefined, headers: HeadersArray) {
super(); super();
this._instrumentation = instrumentation || createInstrumentation(); this._instrumentation = instrumentation || createInstrumentation();
this._localUtils = localUtils; this._localUtils = localUtils;
this.platform = platform; this.platform = platform;
this._rootObject = new Root(this); this._rootObject = new Root(this);
this.headers = headers;
} }
markAsRemote() { markAsRemote() {

View File

@ -15,6 +15,7 @@
*/ */
import { ChannelOwner } from './channelOwner'; import { ChannelOwner } from './channelOwner';
import { Connection } from './connection';
import * as localUtils from '../utils/localUtils'; import * as localUtils from '../utils/localUtils';
import type { Size } from './types'; import type { Size } from './types';
@ -76,7 +77,28 @@ export class LocalUtils extends ChannelOwner<channels.LocalUtilsChannel> {
return await localUtils.addStackToTracingNoReply(this._stackSessions, params); return await localUtils.addStackToTracingNoReply(this._stackSessions, params);
} }
async connect(params: channels.LocalUtilsConnectParams): Promise<channels.LocalUtilsConnectResult> { async connect(params: channels.LocalUtilsConnectParams): Promise<Connection> {
return await this._channel.connect(params); const { pipe, headers: connectHeaders } = await this._channel.connect(params);
const closePipe = () => this._wrapApiCall(() => pipe.close().catch(() => {}), /* isInternal */ true);
const connection = new Connection(this, this._platform, this._instrumentation, connectHeaders);
connection.markAsRemote();
connection.on('close', closePipe);
let closeError: string | undefined;
const onPipeClosed = (reason?: string) => {
connection.close(reason || closeError);
};
pipe.on('closed', params => onPipeClosed(params.reason));
connection.onmessage = message => this._wrapApiCall(() => pipe.send({ message }).catch(() => onPipeClosed()), /* isInternal */ true);
pipe.on('message', ({ message }) => {
try {
connection!.dispatch(message);
} catch (e) {
closeError = String(e);
closePipe();
}
});
return connection;
} }
} }

View File

@ -26,7 +26,7 @@ import type { Platform } from './utils/platform';
export function createInProcessPlaywright(platform: Platform): PlaywrightAPI { export function createInProcessPlaywright(platform: Platform): PlaywrightAPI {
const playwright = createPlaywright({ sdkLanguage: (process.env.PW_LANG_NAME as Language | undefined) || 'javascript' }); const playwright = createPlaywright({ sdkLanguage: (process.env.PW_LANG_NAME as Language | undefined) || 'javascript' });
const clientConnection = new Connection(undefined, platform, undefined); const clientConnection = new Connection(undefined, platform, undefined, []);
clientConnection.useRawBuffers(); clientConnection.useRawBuffers();
const dispatcherConnection = new DispatcherConnection(true /* local */); const dispatcherConnection = new DispatcherConnection(true /* local */);

View File

@ -48,7 +48,7 @@ class PlaywrightClient {
this._driverProcess.unref(); this._driverProcess.unref();
this._driverProcess.stderr!.on('data', data => process.stderr.write(data)); this._driverProcess.stderr!.on('data', data => process.stderr.write(data));
const connection = new Connection(undefined, nodePlatform, undefined); const connection = new Connection(undefined, nodePlatform, undefined, []);
const transport = new PipeTransport(this._driverProcess.stdin!, this._driverProcess.stdout!); const transport = new PipeTransport(this._driverProcess.stdin!, this._driverProcess.stdout!);
connection.onmessage = message => transport.send(JSON.stringify(message)); connection.onmessage = message => transport.send(JSON.stringify(message));
transport.onmessage = message => connection.dispatch(JSON.parse(message)); transport.onmessage = message => connection.dispatch(JSON.parse(message));

View File

@ -471,7 +471,7 @@ function normalizeScreenshotMode(screenshot: ScreenshotOption): ScreenshotMode {
} }
function attachConnectedHeaderIfNeeded(testInfo: TestInfo, browser: Browser | null) { function attachConnectedHeaderIfNeeded(testInfo: TestInfo, browser: Browser | null) {
const connectHeaders: { name: string, value: string }[] | undefined = (browser as any)?._connectHeaders; const connectHeaders: { name: string, value: string }[] | undefined = (browser as any)?._connection.headers;
if (!connectHeaders) if (!connectHeaders)
return; return;
for (const header of connectHeaders) { for (const header of connectHeaders) {