Commit 23dffed3 authored by Robert Knight's avatar Robert Knight

Import multiple annotations concurrently

Speed up large imports by allowing up to 5 imports to be in-flight concurrently.
The simplest approach to this would be to divide the annotations into batches
and save one at a time. However due to the variability in the time that an
individual import can take, this can lead to sub-optimal concurrency. Instead
structure the code so that we try to always keep `MAX_CONCURRENT_IMPORTS`
imports in flight at once, as long as there are that many remaining.

Fixes https://github.com/hypothesis/client/issues/5739
parent 5cce2e59
...@@ -4,6 +4,9 @@ import type { SidebarStore } from '../store'; ...@@ -4,6 +4,9 @@ import type { SidebarStore } from '../store';
import type { AnnotationsService } from './annotations'; import type { AnnotationsService } from './annotations';
import type { ToastMessengerService } from './toast-messenger'; import type { ToastMessengerService } from './toast-messenger';
/** Number of annotations to import concurrently. */
export const MAX_CONCURRENT_IMPORTS = 5;
/** /**
* The subset of annotation fields which are preserved during an import. * The subset of annotation fields which are preserved during an import.
*/ */
...@@ -116,19 +119,20 @@ export class ImportAnnotationsService { ...@@ -116,19 +119,20 @@ export class ImportAnnotationsService {
/** /**
* Import annotations. * Import annotations.
*
* Returns an array of the results of each import. The results are in the
* same order as the input annotation list. Each result can either be a
* successful import, a skipped import, or an error.
*/ */
async import(anns: APIAnnotationData[]): Promise<ImportResult[]> { async import(anns: APIAnnotationData[]): Promise<ImportResult[]> {
this._store.beginImport(anns.length); this._store.beginImport(anns.length);
const existingAnns = this._store.allAnnotations(); const existingAnns = this._store.allAnnotations();
const results: ImportResult[] = [];
for (const ann of anns) { const importAnn = async (ann: APIAnnotationData): Promise<ImportResult> => {
const existingAnn = existingAnns.find(ex => duplicateMatch(ann, ex)); const existingAnn = existingAnns.find(ex => duplicateMatch(ann, ex));
if (existingAnn) { if (existingAnn) {
results.push({ type: 'duplicate', annotation: existingAnn }); return { type: 'duplicate', annotation: existingAnn };
this._store.completeImport(1);
continue;
} }
try { try {
...@@ -143,13 +147,41 @@ export class ImportAnnotationsService { ...@@ -143,13 +147,41 @@ export class ImportAnnotationsService {
// Persist the annotation. // Persist the annotation.
const saved = await this._annotationsService.save(saveData); const saved = await this._annotationsService.save(saveData);
results.push({ type: 'import', annotation: saved }); return { type: 'import', annotation: saved };
} catch (error) { } catch (error) {
results.push({ type: 'error', error }); return { type: 'error', error };
} finally { }
};
// Save annotations to the server, allowing a maximum of
// `MAX_CONCURRENT_IMPORTS` in-flight requests at a time.
const results: ImportResult[] = [];
const queue = anns.map((ann, index) => ({ ann, index }));
const active: Array<Promise<void>> = [];
while (queue.length > 0) {
const task = queue.shift()!;
const done = importAnn(task.ann)
.then(result => {
this._store.completeImport(1); this._store.completeImport(1);
results[task.index] = result;
})
.then(() => {
const idx = active.indexOf(done);
// nb. `idx` should always be >= 0 here.
if (idx !== -1) {
active.splice(idx, 1);
} }
});
active.push(done);
// When we reach max concurrency, wait for at least one import to complete.
if (active.length >= MAX_CONCURRENT_IMPORTS) {
await Promise.race(active);
} }
}
// Wait for all remaining imports to complete.
await Promise.all(active);
const { messageType, message } = importStatus(results); const { messageType, message } = importStatus(results);
if (messageType === 'success') { if (messageType === 'success') {
......
...@@ -84,6 +84,25 @@ describe('ImportAnnotationsService', () => { ...@@ -84,6 +84,25 @@ describe('ImportAnnotationsService', () => {
}); });
}); });
it('can save many annotations', async () => {
const svc = createService();
const anns = [];
const totalAnns = 23; // A total that exceeds the max number of concurrent imports.
while (anns.length < totalAnns) {
anns.push(generateAnnotation());
}
await svc.import(anns);
assert.equal(fakeAnnotationsService.save.callCount, anns.length);
for (const ann of anns) {
assert.calledWith(fakeAnnotationsService.save, {
$tag: 'dummy',
...ann,
});
}
});
it('does not skip annotation if existing annotations in store differ', async () => { it('does not skip annotation if existing annotations in store differ', async () => {
const svc = createService(); const svc = createService();
const ann = generateAnnotation(); const ann = generateAnnotation();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment