Finalised queue with sandboxed mode for child process

This commit is contained in:
GeorgeWebberley 2026-03-01 12:47:00 +01:00
parent 7369914492
commit 0f01b0cb4e
12 changed files with 163 additions and 30 deletions

View file

@ -18,4 +18,8 @@ STORAGE_REGION=us-east-1
STORAGE_FORCE_PATH_STYLE=true
# AI Config
GOOGLE_API_KEY=your_gemini_api_key_here
GOOGLE_API_KEY=your_gemini_api_key_here
# Redis Config
REDIS_HOST=localhost
REDIS_PORT=6379

45
package-lock.json generated
View file

@ -25,6 +25,7 @@
"@nestjs/platform-express": "^11.0.1",
"@prisma/adapter-pg": "^7.4.2",
"@prisma/client": "^7.4.2",
"axios": "^1.13.6",
"bullmq": "^5.70.1",
"cheerio": "^1.2.0",
"class-transformer": "^0.5.1",
@ -6885,7 +6886,6 @@
"version": "0.4.0",
"resolved": "https://registry.npmjs.org/asynckit/-/asynckit-0.4.0.tgz",
"integrity": "sha512-Oei9OH4tRh0YqU3GxhX79dM/mwVgvbZJaSNaRk+bshkj0S5cfHcgYakreBjrHwatXKbz+IoIdYLxrKim2MjW0Q==",
"dev": true,
"license": "MIT"
},
"node_modules/available-typed-arrays": {
@ -6913,6 +6913,17 @@
"node": ">= 6.0.0"
}
},
"node_modules/axios": {
"version": "1.13.6",
"resolved": "https://registry.npmjs.org/axios/-/axios-1.13.6.tgz",
"integrity": "sha512-ChTCHMouEe2kn713WHbQGcuYrr6fXTBiu460OTwWrWob16g1bXn4vtz07Ope7ewMozJAnEquLk5lWQWtBig9DQ==",
"license": "MIT",
"dependencies": {
"follow-redirects": "^1.15.11",
"form-data": "^4.0.5",
"proxy-from-env": "^1.1.0"
}
},
"node_modules/babel-jest": {
"version": "30.2.0",
"resolved": "https://registry.npmjs.org/babel-jest/-/babel-jest-30.2.0.tgz",
@ -7681,7 +7692,6 @@
"version": "1.0.8",
"resolved": "https://registry.npmjs.org/combined-stream/-/combined-stream-1.0.8.tgz",
"integrity": "sha512-FQN4MRfuJeHf7cBbBMJFXhKSDq+2kAArBlmRBvcvFE5BB1HZKXtSFASDhdlz9zOYwxh8lDdnvmMOe/+5cdoEdg==",
"dev": true,
"license": "MIT",
"dependencies": {
"delayed-stream": "~1.0.0"
@ -8058,7 +8068,6 @@
"version": "1.0.0",
"resolved": "https://registry.npmjs.org/delayed-stream/-/delayed-stream-1.0.0.tgz",
"integrity": "sha512-ZySD7Nf91aLB0RxL4KGrKHBXl7Eds1DAmEdcoVawXnLD7SDhpNgtuII2aAkg7a7QS41jxPSZ17p4VdGnMHk3MQ==",
"dev": true,
"license": "MIT",
"engines": {
"node": ">=0.4.0"
@ -8398,7 +8407,6 @@
"version": "2.1.0",
"resolved": "https://registry.npmjs.org/es-set-tostringtag/-/es-set-tostringtag-2.1.0.tgz",
"integrity": "sha512-j6vWzfrGVfyXxge+O0x5sh6cvxAog0a/4Rdd2K36zCMV5eJ+/+tOAngRO8cODMNWbVRdVlmGZQL2YS3yR8bIUA==",
"dev": true,
"license": "MIT",
"dependencies": {
"es-errors": "^1.3.0",
@ -9064,6 +9072,26 @@
"dev": true,
"license": "ISC"
},
"node_modules/follow-redirects": {
"version": "1.15.11",
"resolved": "https://registry.npmjs.org/follow-redirects/-/follow-redirects-1.15.11.tgz",
"integrity": "sha512-deG2P0JfjrTxl50XGCDyfI97ZGVCxIpfKYmfyrQ54n5FO/0gfIES8C/Psl6kWVDolizcaaxZJnTS0QSMxvnsBQ==",
"funding": [
{
"type": "individual",
"url": "https://github.com/sponsors/RubenVerborgh"
}
],
"license": "MIT",
"engines": {
"node": ">=4.0"
},
"peerDependenciesMeta": {
"debug": {
"optional": true
}
}
},
"node_modules/for-each": {
"version": "0.3.5",
"resolved": "https://registry.npmjs.org/for-each/-/for-each-0.3.5.tgz",
@ -9128,7 +9156,6 @@
"version": "4.0.5",
"resolved": "https://registry.npmjs.org/form-data/-/form-data-4.0.5.tgz",
"integrity": "sha512-8RipRLol37bNs2bhoV67fiTEvdTrbMUYcFTiy3+wuuOnUog2QBHCZWXDRijWQfAkhBj2Uf5UnVaiWwA5vdd82w==",
"dev": true,
"license": "MIT",
"dependencies": {
"asynckit": "^0.4.0",
@ -9145,7 +9172,6 @@
"version": "1.52.0",
"resolved": "https://registry.npmjs.org/mime-db/-/mime-db-1.52.0.tgz",
"integrity": "sha512-sPU4uV7dYlvtWJxwwxHD0PuihVNiE7TyAbQ5SWxDCB9mUYvOgroQOwYQQOKPJ8CIbE+1ETVlOoK1UC2nU3gYvg==",
"dev": true,
"license": "MIT",
"engines": {
"node": ">= 0.6"
@ -9155,7 +9181,6 @@
"version": "2.1.35",
"resolved": "https://registry.npmjs.org/mime-types/-/mime-types-2.1.35.tgz",
"integrity": "sha512-ZDY+bPm5zTTF+YpCrAU9nK0UgICYPT0QtT1NZWFv4s++TNkcgVaT0g6+4R2uI4MjQjzysHB1zxuWL50hzaeXiw==",
"dev": true,
"license": "MIT",
"dependencies": {
"mime-db": "1.52.0"
@ -12592,6 +12617,12 @@
"node": ">= 0.10"
}
},
"node_modules/proxy-from-env": {
"version": "1.1.0",
"resolved": "https://registry.npmjs.org/proxy-from-env/-/proxy-from-env-1.1.0.tgz",
"integrity": "sha512-D+zkORCbA9f1tdWRK0RaCR3GPv50cMxcrz4X8k5LTSUD1Dkw47mKJEZQNunItRTkWwgtaUSo1RVFRIG9ZXiFYg==",
"license": "MIT"
},
"node_modules/pure-rand": {
"version": "7.0.1",
"resolved": "https://registry.npmjs.org/pure-rand/-/pure-rand-7.0.1.tgz",

View file

@ -36,6 +36,7 @@
"@nestjs/platform-express": "^11.0.1",
"@prisma/adapter-pg": "^7.4.2",
"@prisma/client": "^7.4.2",
"axios": "^1.13.6",
"bullmq": "^5.70.1",
"cheerio": "^1.2.0",
"class-transformer": "^0.5.1",

View file

@ -0,0 +1,3 @@
-- AlterTable
ALTER TABLE "CaseLaw" ADD COLUMN "metadata" JSONB,
ADD COLUMN "storageKey" TEXT;

View file

@ -0,0 +1,7 @@
-- CreateEnum
CREATE TYPE "CaseStatus" AS ENUM ('PENDING', 'PROCESSING', 'COMPLETED', 'FAILED');
-- AlterTable
ALTER TABLE "CaseLaw" ADD COLUMN "logs" TEXT[],
ADD COLUMN "processingError" TEXT,
ADD COLUMN "status" "CaseStatus" NOT NULL DEFAULT 'PENDING';

View file

@ -13,18 +13,28 @@ datasource db {
provider = "postgresql"
}
model CaseLaw {
id String @id @default(uuid())
title String
decisionType String?
decisionDate DateTime?
office String?
court String?
caseNumber String?
summary String? @db.Text
storageKey String?
fileType String
metadata Json?
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
enum CaseStatus {
PENDING
PROCESSING
COMPLETED
FAILED
}
model CaseLaw {
id String @id @default(uuid())
title String
decisionType String?
decisionDate DateTime?
office String?
court String?
caseNumber String?
summary String? @db.Text
storageKey String?
fileType String
status CaseStatus @default(PENDING)
processingError String?
logs String[]
metadata Json?
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
}

View file

@ -1,8 +1,23 @@
import { Module } from '@nestjs/common';
import { BullModule } from '@nestjs/bullmq';
import { join } from 'path';
import { CasesService } from './cases.service';
import { CasesResolver } from './cases.resolver';
import { ParserService } from './parser/parser.service';
import { CaseQueueListener } from './processors/case-queue.listener';
@Module({
providers: [CasesService, CasesResolver]
imports: [
BullModule.registerQueue({
name: 'case-processing',
processors: [
{
path: join(__dirname, 'processors', 'case.worker.js'),
},
],
}),
],
providers: [CasesService, CasesResolver, ParserService, CaseQueueListener],
})
export class CasesModule {}

View file

@ -3,8 +3,9 @@ import { GraphQLUpload } from 'graphql-upload-ts';
import type { FileUpload } from 'graphql-upload-ts';
import { CasesService } from './cases.service';
import { CaseLaw } from './entities/case-law.entity';
import { CaseFileValidationPipe } from 'src/common/pipes/file-validation.pipe';
import { StorageService } from 'src/common/storage/storage.service';
import { StorageService } from '../common/storage/storage.service';
import { CaseFileValidationPipe } from '../common/pipes/file-validation.pipe';
import { CaseStatus } from '../generated/prisma/client.js';
@Resolver(() => CaseLaw)
export class CasesResolver {
@ -14,6 +15,8 @@ export class CasesResolver {
) {}
@Query(() => CaseLaw, { name: 'caseLaw', nullable: true })
async findOne(
@Args('id', { type: () => String, nullable: true }) id?: string,
@ -41,6 +44,8 @@ export class CasesResolver {
}
const buffer = Buffer.concat(chunks);
// Buffering into memory like this might not be perfectly "scalable" for 1GB files,
// but for this project and 10MB limit it's simpler to handle than passing a stream.
return this.casesService.processAndSave(buffer, mimetype, filename);
}

View file

@ -1,6 +1,10 @@
import { Injectable, NotFoundException, BadRequestException, Logger, Inject } from '@nestjs/common';
import { PRISMA_CLIENT, type PrismaClientInstance } from '../common/prisma/prisma.service';
import { isUuid } from 'src/common/utils/string.utils';
import { Queue } from 'bullmq';
import { StorageService } from 'src/common/storage/storage.service';
import { InjectQueue } from '@nestjs/bullmq';
import { CaseStatus } from 'src/generated/prisma/enums';
@Injectable()
export class CasesService {
@ -8,19 +12,33 @@ export class CasesService {
constructor(
@Inject(PRISMA_CLIENT) private prisma: PrismaClientInstance,
private storage: StorageService,
@InjectQueue('case-processing') private caseQueue: Queue,
) {}
async processAndSave(buffer: Buffer, mimetype: string, filename: string) {
this.logger.log(`Upload received: ${filename} (${mimetype}, ${(buffer.length / 1024).toFixed(1)} KB)`);
const storageKey = await this.storage.upload(buffer, filename, mimetype);
const fileType = mimetype === 'application/pdf' ? 'PDF' : 'HTML';
const caseLaw = await this.prisma.caseLaw.create({
data: {
title: `Processing: ${filename}`,
fileType,
storageKey,
status: CaseStatus.PENDING,
},
});
const workerDownloadUrl = await this.storage.getPresignedUrl(storageKey);
await this.caseQueue.add('parse-case', {
caseId: caseLaw.id,
downloadUrl: workerDownloadUrl,
mimetype,
});
return caseLaw;
}

View file

@ -1,6 +1,10 @@
import { ObjectType, Field, ID } from '@nestjs/graphql';
import { ObjectType, Field, ID, registerEnumType } from '@nestjs/graphql';
import { GraphQLJSON } from 'graphql-type-json';
import { CaseStatus } from '../../generated/prisma/client.js';
registerEnumType(CaseStatus, {
name: 'CaseStatus',
});
@ObjectType()
export class CaseLaw {
@ -34,9 +38,18 @@ export class CaseLaw {
@Field()
fileType: string;
@Field(() => [String], { defaultValue: [] })
logs: string[];
@Field(() => GraphQLJSON, { nullable: true })
metadata?: any;
@Field(() => CaseStatus)
status: CaseStatus;
@Field({ nullable: true })
processingError?: string;
@Field()
createdAt: Date;

View file

@ -1,15 +1,15 @@
import { QueueEventsListener, QueueEventsHost, OnQueueEvent, InjectQueue } from '@nestjs/bullmq';
import { Queue } from 'bullmq';
import { PrismaService } from '../../common/prisma/prisma.service';
import { CaseStatus } from '@prisma/client';
import { Logger } from '@nestjs/common';
import { PRISMA_CLIENT, type PrismaClientInstance } from '../../common/prisma/prisma.service';
import { CaseStatus } from '../../generated/prisma/client.js';
import { Logger, Inject } from '@nestjs/common';
@QueueEventsListener('case-processing')
export class CaseQueueListener extends QueueEventsHost {
private readonly logger = new Logger(CaseQueueListener.name);
constructor(
private prisma: PrismaService,
@Inject(PRISMA_CLIENT) private prisma: PrismaClientInstance,
@InjectQueue('case-processing') private readonly queue: Queue,
) {
super();

View file

@ -0,0 +1,26 @@
import 'dotenv/config';
import { Job } from 'bullmq';
import axios from 'axios';
import { ParserService } from '../parser/parser.service';
// I chose a sandboxed worker here (separate thread/process) because PDF parsing and
// AI calls can be surprisingly CPU heavy. If we did this on the main event loop,
// the whole API might lag while one person uploads a massive legal doc.
// This keeps the API snappy while the heavy lifting happens in the background.
export default async function (job: Job) {
const { downloadUrl, mimetype, caseId } = job.data;
try {
// Download
await job.updateProgress('📡 Downloading Case File from storage...');
const response = await axios.get(downloadUrl, { responseType: 'arraybuffer' });
const buffer = Buffer.from(response.data);
await job.updateProgress(`✅ File downloaded (${(buffer.length / 1024).toFixed(1)} KB)`);
// Parse
await job.updateProgress('🔍 Extracting text content...');
const result = await ParserService.parse(buffer, mimetype, job);
return result;
} catch (error) {
throw new Error(`Sandboxed parsing failed: ${error.message}`);
}
}