chore: refactor Dispatcher._scheduleJob to be sync and easier to follow (#35124)

This commit is contained in:
Dmitry Gozman 2025-03-11 09:17:38 +00:00 committed by GitHub
parent ec4c66133e
commit 2965c5790a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 66 additions and 48 deletions

View File

@ -60,53 +60,65 @@ export class Dispatcher {
}
}
private async _scheduleJob() {
// 1. Find a job/worker combination to run.
private _findFirstJobToRun() {
// Always pick the first job that can be run while respecting the project worker limit.
for (let index = 0; index < this._queue.length; index++) {
const job = this._queue[index];
const projectIdWorkerLimit = this._workerLimitPerProjectId.get(job.projectId);
if (!projectIdWorkerLimit)
return index;
const runningWorkersWithSameProjectId = this._workerSlots.filter(w => w.busy && w.worker && w.worker.projectId() === job.projectId).length;
if (runningWorkersWithSameProjectId < projectIdWorkerLimit)
return index;
}
return -1;
}
private _scheduleJob() {
// NOTE: keep this method synchronous for easier reasoning.
// 0. No more running jobs after stop.
if (this._isStopped)
return;
let jobIndex = -1;
let workerIndex = -1;
for (let index = 0; index < this._queue.length; index++) {
const job = this._queue[index];
// 2.1 Respect the project worker limit.
const projectIdWorkerLimit = this._workerLimitPerProjectId.get(job.projectId);
if (projectIdWorkerLimit) {
const runningWorkersWithSameProjectId = this._workerSlots.filter(w => w.busy && w.worker && w.worker.projectId() === job.projectId).length;
if (runningWorkersWithSameProjectId >= projectIdWorkerLimit)
continue;
}
// 2.2. Find a worker with the same hash, or just some free worker.
workerIndex = this._workerSlots.findIndex(w => !w.busy && w.worker && w.worker.hash() === job.workerHash && !w.worker.didSendStop());
if (workerIndex === -1)
workerIndex = this._workerSlots.findIndex(w => !w.busy);
jobIndex = index;
break;
}
// 2.3. No workers available, bail out.
// 1. Find a job to run.
const jobIndex = this._findFirstJobToRun();
if (jobIndex === -1)
return;
// 3. Claim both the job and the worker, run the job and release the worker.
const job = this._queue[jobIndex];
// 2. Find a worker with the same hash, or just some free worker.
let workerIndex = this._workerSlots.findIndex(w => !w.busy && w.worker && w.worker.hash() === job.workerHash && !w.worker.didSendStop());
if (workerIndex === -1)
workerIndex = this._workerSlots.findIndex(w => !w.busy);
if (workerIndex === -1) {
// No workers available, bail out.
return;
}
// 3. Claim both the job and the worker slot.
this._queue.splice(jobIndex, 1);
const jobDispatcher = new JobDispatcher(job, this._reporter, this._failureTracker, () => this.stop().catch(() => {}));
this._workerSlots[workerIndex].busy = true;
await this._startJobInWorker(workerIndex, job);
this._workerSlots[workerIndex].busy = false;
this._workerSlots[workerIndex].jobDispatcher = jobDispatcher;
// 4. Check the "finished" condition.
this._checkFinished();
// 4. Run the job. This is the only async operation.
void this._runJobInWorker(workerIndex, jobDispatcher).then(() => {
// 5. We got a free worker - perhaps we can immediately start another job?
void this._scheduleJob();
// 5. Release the worker slot.
this._workerSlots[workerIndex].jobDispatcher = undefined;
this._workerSlots[workerIndex].busy = false;
// 6. Check whether we are done or should schedule another job.
this._checkFinished();
this._scheduleJob();
});
}
private async _startJobInWorker(index: number, job: TestGroup) {
const stopCallback = () => this.stop().catch(() => {});
const jobDispatcher = new JobDispatcher(job, this._reporter, this._failureTracker, stopCallback);
private async _runJobInWorker(index: number, jobDispatcher: JobDispatcher) {
const job = jobDispatcher.job;
// 0. Perhaps skip the whole job?
if (jobDispatcher.skipWholeJob())
return;
@ -119,7 +131,6 @@ export class Dispatcher {
if (this._isStopped) // Check stopped signal after async hop.
return;
}
this._workerSlots[index].jobDispatcher = jobDispatcher;
// 2. Start the worker if it is down.
let startError;
@ -132,13 +143,12 @@ export class Dispatcher {
return;
}
// 3. Run the job.
// 3. Finally, run some tests in the worker! Or fail all of them because of startup error...
if (startError)
jobDispatcher.onExit(startError);
else
jobDispatcher.runInWorker(worker);
const result = await jobDispatcher.jobResult;
this._workerSlots[index].jobDispatcher = undefined;
this._updateCounterForWorkerHash(job.workerHash, -1);
// 4. When worker encounters error, we stop it and create a new one.
@ -148,10 +158,10 @@ export class Dispatcher {
else if (this._isWorkerRedundant(worker))
void worker.stop();
// 5. Possibly schedule a new job with leftover tests and/or retries.
// 5. Possibly queue a new job with leftover tests and/or retries.
if (!this._isStopped && result.newJob) {
this._queue.unshift(result.newJob);
this._updateCounterForWorkerHash(job.workerHash, +1);
this._updateCounterForWorkerHash(result.newJob.workerHash, +1);
}
}
@ -198,7 +208,7 @@ export class Dispatcher {
this._workerSlots.push({ busy: false });
// 2. Schedule enough jobs.
for (let i = 0; i < this._workerSlots.length; i++)
void this._scheduleJob();
this._scheduleJob();
this._checkFinished();
// 3. More jobs are scheduled when the worker becomes free.
// 4. Wait for all jobs to finish.
@ -260,6 +270,10 @@ export class Dispatcher {
class JobDispatcher {
jobResult = new ManualPromise<{ newJob?: TestGroup, didFail: boolean }>();
readonly job: TestGroup;
private _reporter: ReporterV2;
private _failureTracker: FailureTracker;
private _stopCallback: () => void;
private _listeners: RegisteredListener[] = [];
private _failedTests = new Set<TestCase>();
private _failedWithNonRetriableError = new Set<TestCase|Suite>();
@ -269,8 +283,12 @@ class JobDispatcher {
private _workerIndex = 0;
private _currentlyRunning: { test: TestCase, result: TestResult } | undefined;
constructor(private _job: TestGroup, private _reporter: ReporterV2, private _failureTracker: FailureTracker, private _stopCallback: () => void) {
this._remainingByTestId = new Map(this._job.tests.map(e => [e.id, e]));
constructor(job: TestGroup, reporter: ReporterV2, failureTracker: FailureTracker, stopCallback: () => void) {
this.job = job;
this._reporter = reporter;
this._failureTracker = failureTracker;
this._stopCallback = stopCallback;
this._remainingByTestId = new Map(this.job.tests.map(e => [e.id, e]));
}
private _onTestBegin(params: TestBeginPayload) {
@ -514,7 +532,7 @@ class JobDispatcher {
}
// This job is over, we will schedule another one.
const newJob = remaining.length ? { ...this._job, tests: remaining } : undefined;
const newJob = remaining.length ? { ...this.job, tests: remaining } : undefined;
this._finished({ didFail: true, newJob });
}
@ -535,8 +553,8 @@ class JobDispatcher {
this._workerIndex = worker.workerIndex;
const runPayload: RunPayload = {
file: this._job.requireFile,
entries: this._job.tests.map(test => {
file: this.job.requireFile,
entries: this.job.tests.map(test => {
return { testId: test.id, retry: test.results.length };
}),
};
@ -561,9 +579,9 @@ class JobDispatcher {
// the whole group to the worker process and report tests in the natural order,
// with skipped tests mixed in-between non-skipped. This makes
// for a better reporter experience.
const allTestsSkipped = this._job.tests.every(test => test.expectedStatus === 'skipped');
const allTestsSkipped = this.job.tests.every(test => test.expectedStatus === 'skipped');
if (allTestsSkipped && !this._failureTracker.hasReachedMaxFailures()) {
for (const test of this._job.tests) {
for (const test of this.job.tests) {
const result = test._appendTestResult();
this._reporter.onTestBegin?.(test, result);
result.status = 'skipped';