From 0f01b0cb4ee5c8ece50e0fded72957b896bc157c Mon Sep 17 00:00:00 2001 From: GeorgeWebberley Date: Sun, 1 Mar 2026 12:47:00 +0100 Subject: [PATCH] Finalised queue with sandboxed mode for child process --- .env.example | 6 ++- package-lock.json | 45 ++++++++++++++++--- package.json | 1 + .../migration.sql | 3 ++ .../migration.sql | 7 +++ prisma/schema.prisma | 38 ++++++++++------ src/cases/cases.module.ts | 17 ++++++- src/cases/cases.resolver.ts | 9 +++- src/cases/cases.service.ts | 18 ++++++++ src/cases/entities/case-law.entity.ts | 15 ++++++- src/cases/processors/case-queue.listener.ts | 8 ++-- src/cases/processors/case.worker.ts | 26 +++++++++++ 12 files changed, 163 insertions(+), 30 deletions(-) create mode 100644 prisma/migrations/20260301113941_add_metadata_storage_key/migration.sql create mode 100644 prisma/migrations/20260301114058_add_status_logs/migration.sql create mode 100644 src/cases/processors/case.worker.ts diff --git a/.env.example b/.env.example index 4dc8043..1e41266 100644 --- a/.env.example +++ b/.env.example @@ -18,4 +18,8 @@ STORAGE_REGION=us-east-1 STORAGE_FORCE_PATH_STYLE=true # AI Config -GOOGLE_API_KEY=your_gemini_api_key_here \ No newline at end of file +GOOGLE_API_KEY=your_gemini_api_key_here + +# Redis Config +REDIS_HOST=localhost +REDIS_PORT=6379 \ No newline at end of file diff --git a/package-lock.json b/package-lock.json index f487519..803bd1b 100644 --- a/package-lock.json +++ b/package-lock.json @@ -25,6 +25,7 @@ "@nestjs/platform-express": "^11.0.1", "@prisma/adapter-pg": "^7.4.2", "@prisma/client": "^7.4.2", + "axios": "^1.13.6", "bullmq": "^5.70.1", "cheerio": "^1.2.0", "class-transformer": "^0.5.1", @@ -6885,7 +6886,6 @@ "version": "0.4.0", "resolved": "https://registry.npmjs.org/asynckit/-/asynckit-0.4.0.tgz", "integrity": "sha512-Oei9OH4tRh0YqU3GxhX79dM/mwVgvbZJaSNaRk+bshkj0S5cfHcgYakreBjrHwatXKbz+IoIdYLxrKim2MjW0Q==", - "dev": true, "license": "MIT" }, "node_modules/available-typed-arrays": { @@ -6913,6 +6913,17 @@ "node": ">= 6.0.0" } }, + "node_modules/axios": { + "version": "1.13.6", + "resolved": "https://registry.npmjs.org/axios/-/axios-1.13.6.tgz", + "integrity": "sha512-ChTCHMouEe2kn713WHbQGcuYrr6fXTBiu460OTwWrWob16g1bXn4vtz07Ope7ewMozJAnEquLk5lWQWtBig9DQ==", + "license": "MIT", + "dependencies": { + "follow-redirects": "^1.15.11", + "form-data": "^4.0.5", + "proxy-from-env": "^1.1.0" + } + }, "node_modules/babel-jest": { "version": "30.2.0", "resolved": "https://registry.npmjs.org/babel-jest/-/babel-jest-30.2.0.tgz", @@ -7681,7 +7692,6 @@ "version": "1.0.8", "resolved": "https://registry.npmjs.org/combined-stream/-/combined-stream-1.0.8.tgz", "integrity": "sha512-FQN4MRfuJeHf7cBbBMJFXhKSDq+2kAArBlmRBvcvFE5BB1HZKXtSFASDhdlz9zOYwxh8lDdnvmMOe/+5cdoEdg==", - "dev": true, "license": "MIT", "dependencies": { "delayed-stream": "~1.0.0" @@ -8058,7 +8068,6 @@ "version": "1.0.0", "resolved": "https://registry.npmjs.org/delayed-stream/-/delayed-stream-1.0.0.tgz", "integrity": "sha512-ZySD7Nf91aLB0RxL4KGrKHBXl7Eds1DAmEdcoVawXnLD7SDhpNgtuII2aAkg7a7QS41jxPSZ17p4VdGnMHk3MQ==", - "dev": true, "license": "MIT", "engines": { "node": ">=0.4.0" @@ -8398,7 +8407,6 @@ "version": "2.1.0", "resolved": "https://registry.npmjs.org/es-set-tostringtag/-/es-set-tostringtag-2.1.0.tgz", "integrity": "sha512-j6vWzfrGVfyXxge+O0x5sh6cvxAog0a/4Rdd2K36zCMV5eJ+/+tOAngRO8cODMNWbVRdVlmGZQL2YS3yR8bIUA==", - "dev": true, "license": "MIT", "dependencies": { "es-errors": "^1.3.0", @@ -9064,6 +9072,26 @@ "dev": true, "license": "ISC" }, + "node_modules/follow-redirects": { + "version": "1.15.11", + "resolved": "https://registry.npmjs.org/follow-redirects/-/follow-redirects-1.15.11.tgz", + "integrity": "sha512-deG2P0JfjrTxl50XGCDyfI97ZGVCxIpfKYmfyrQ54n5FO/0gfIES8C/Psl6kWVDolizcaaxZJnTS0QSMxvnsBQ==", + "funding": [ + { + "type": "individual", + "url": "https://github.com/sponsors/RubenVerborgh" + } + ], + "license": "MIT", + "engines": { + "node": ">=4.0" + }, + "peerDependenciesMeta": { + "debug": { + "optional": true + } + } + }, "node_modules/for-each": { "version": "0.3.5", "resolved": "https://registry.npmjs.org/for-each/-/for-each-0.3.5.tgz", @@ -9128,7 +9156,6 @@ "version": "4.0.5", "resolved": "https://registry.npmjs.org/form-data/-/form-data-4.0.5.tgz", "integrity": "sha512-8RipRLol37bNs2bhoV67fiTEvdTrbMUYcFTiy3+wuuOnUog2QBHCZWXDRijWQfAkhBj2Uf5UnVaiWwA5vdd82w==", - "dev": true, "license": "MIT", "dependencies": { "asynckit": "^0.4.0", @@ -9145,7 +9172,6 @@ "version": "1.52.0", "resolved": "https://registry.npmjs.org/mime-db/-/mime-db-1.52.0.tgz", "integrity": "sha512-sPU4uV7dYlvtWJxwwxHD0PuihVNiE7TyAbQ5SWxDCB9mUYvOgroQOwYQQOKPJ8CIbE+1ETVlOoK1UC2nU3gYvg==", - "dev": true, "license": "MIT", "engines": { "node": ">= 0.6" @@ -9155,7 +9181,6 @@ "version": "2.1.35", "resolved": "https://registry.npmjs.org/mime-types/-/mime-types-2.1.35.tgz", "integrity": "sha512-ZDY+bPm5zTTF+YpCrAU9nK0UgICYPT0QtT1NZWFv4s++TNkcgVaT0g6+4R2uI4MjQjzysHB1zxuWL50hzaeXiw==", - "dev": true, "license": "MIT", "dependencies": { "mime-db": "1.52.0" @@ -12592,6 +12617,12 @@ "node": ">= 0.10" } }, + "node_modules/proxy-from-env": { + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/proxy-from-env/-/proxy-from-env-1.1.0.tgz", + "integrity": "sha512-D+zkORCbA9f1tdWRK0RaCR3GPv50cMxcrz4X8k5LTSUD1Dkw47mKJEZQNunItRTkWwgtaUSo1RVFRIG9ZXiFYg==", + "license": "MIT" + }, "node_modules/pure-rand": { "version": "7.0.1", "resolved": "https://registry.npmjs.org/pure-rand/-/pure-rand-7.0.1.tgz", diff --git a/package.json b/package.json index d8a958a..bfbde88 100644 --- a/package.json +++ b/package.json @@ -36,6 +36,7 @@ "@nestjs/platform-express": "^11.0.1", "@prisma/adapter-pg": "^7.4.2", "@prisma/client": "^7.4.2", + "axios": "^1.13.6", "bullmq": "^5.70.1", "cheerio": "^1.2.0", "class-transformer": "^0.5.1", diff --git a/prisma/migrations/20260301113941_add_metadata_storage_key/migration.sql b/prisma/migrations/20260301113941_add_metadata_storage_key/migration.sql new file mode 100644 index 0000000..cbe36a5 --- /dev/null +++ b/prisma/migrations/20260301113941_add_metadata_storage_key/migration.sql @@ -0,0 +1,3 @@ +-- AlterTable +ALTER TABLE "CaseLaw" ADD COLUMN "metadata" JSONB, +ADD COLUMN "storageKey" TEXT; diff --git a/prisma/migrations/20260301114058_add_status_logs/migration.sql b/prisma/migrations/20260301114058_add_status_logs/migration.sql new file mode 100644 index 0000000..7399231 --- /dev/null +++ b/prisma/migrations/20260301114058_add_status_logs/migration.sql @@ -0,0 +1,7 @@ +-- CreateEnum +CREATE TYPE "CaseStatus" AS ENUM ('PENDING', 'PROCESSING', 'COMPLETED', 'FAILED'); + +-- AlterTable +ALTER TABLE "CaseLaw" ADD COLUMN "logs" TEXT[], +ADD COLUMN "processingError" TEXT, +ADD COLUMN "status" "CaseStatus" NOT NULL DEFAULT 'PENDING'; diff --git a/prisma/schema.prisma b/prisma/schema.prisma index 3f59e68..1a1971a 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -13,18 +13,28 @@ datasource db { provider = "postgresql" } -model CaseLaw { - id String @id @default(uuid()) - title String - decisionType String? - decisionDate DateTime? - office String? - court String? - caseNumber String? - summary String? @db.Text - storageKey String? - fileType String - metadata Json? - createdAt DateTime @default(now()) - updatedAt DateTime @updatedAt +enum CaseStatus { + PENDING + PROCESSING + COMPLETED + FAILED +} + +model CaseLaw { + id String @id @default(uuid()) + title String + decisionType String? + decisionDate DateTime? + office String? + court String? + caseNumber String? + summary String? @db.Text + storageKey String? + fileType String + status CaseStatus @default(PENDING) + processingError String? + logs String[] + metadata Json? + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt } diff --git a/src/cases/cases.module.ts b/src/cases/cases.module.ts index b583b81..fe3cff6 100644 --- a/src/cases/cases.module.ts +++ b/src/cases/cases.module.ts @@ -1,8 +1,23 @@ import { Module } from '@nestjs/common'; +import { BullModule } from '@nestjs/bullmq'; +import { join } from 'path'; import { CasesService } from './cases.service'; import { CasesResolver } from './cases.resolver'; +import { ParserService } from './parser/parser.service'; +import { CaseQueueListener } from './processors/case-queue.listener'; @Module({ - providers: [CasesService, CasesResolver] + imports: [ + BullModule.registerQueue({ + name: 'case-processing', + processors: [ + { + path: join(__dirname, 'processors', 'case.worker.js'), + }, + ], + }), + ], + providers: [CasesService, CasesResolver, ParserService, CaseQueueListener], }) export class CasesModule {} + diff --git a/src/cases/cases.resolver.ts b/src/cases/cases.resolver.ts index 44ea432..9471043 100644 --- a/src/cases/cases.resolver.ts +++ b/src/cases/cases.resolver.ts @@ -3,8 +3,9 @@ import { GraphQLUpload } from 'graphql-upload-ts'; import type { FileUpload } from 'graphql-upload-ts'; import { CasesService } from './cases.service'; import { CaseLaw } from './entities/case-law.entity'; -import { CaseFileValidationPipe } from 'src/common/pipes/file-validation.pipe'; -import { StorageService } from 'src/common/storage/storage.service'; +import { StorageService } from '../common/storage/storage.service'; +import { CaseFileValidationPipe } from '../common/pipes/file-validation.pipe'; +import { CaseStatus } from '../generated/prisma/client.js'; @Resolver(() => CaseLaw) export class CasesResolver { @@ -14,6 +15,8 @@ export class CasesResolver { ) {} + + @Query(() => CaseLaw, { name: 'caseLaw', nullable: true }) async findOne( @Args('id', { type: () => String, nullable: true }) id?: string, @@ -41,6 +44,8 @@ export class CasesResolver { } const buffer = Buffer.concat(chunks); + // Buffering into memory like this might not be perfectly "scalable" for 1GB files, + // but for this project and 10MB limit it's simpler to handle than passing a stream. return this.casesService.processAndSave(buffer, mimetype, filename); } diff --git a/src/cases/cases.service.ts b/src/cases/cases.service.ts index 56e6f7d..7d4ad2f 100644 --- a/src/cases/cases.service.ts +++ b/src/cases/cases.service.ts @@ -1,6 +1,10 @@ import { Injectable, NotFoundException, BadRequestException, Logger, Inject } from '@nestjs/common'; import { PRISMA_CLIENT, type PrismaClientInstance } from '../common/prisma/prisma.service'; import { isUuid } from 'src/common/utils/string.utils'; +import { Queue } from 'bullmq'; +import { StorageService } from 'src/common/storage/storage.service'; +import { InjectQueue } from '@nestjs/bullmq'; +import { CaseStatus } from 'src/generated/prisma/enums'; @Injectable() export class CasesService { @@ -8,19 +12,33 @@ export class CasesService { constructor( @Inject(PRISMA_CLIENT) private prisma: PrismaClientInstance, + private storage: StorageService, + @InjectQueue('case-processing') private caseQueue: Queue, ) {} async processAndSave(buffer: Buffer, mimetype: string, filename: string) { + this.logger.log(`Upload received: ${filename} (${mimetype}, ${(buffer.length / 1024).toFixed(1)} KB)`); + const storageKey = await this.storage.upload(buffer, filename, mimetype); const fileType = mimetype === 'application/pdf' ? 'PDF' : 'HTML'; const caseLaw = await this.prisma.caseLaw.create({ data: { title: `Processing: ${filename}`, fileType, + storageKey, + status: CaseStatus.PENDING, }, }); + const workerDownloadUrl = await this.storage.getPresignedUrl(storageKey); + + await this.caseQueue.add('parse-case', { + caseId: caseLaw.id, + downloadUrl: workerDownloadUrl, + mimetype, + }); + return caseLaw; } diff --git a/src/cases/entities/case-law.entity.ts b/src/cases/entities/case-law.entity.ts index 02bd933..3bdecf1 100644 --- a/src/cases/entities/case-law.entity.ts +++ b/src/cases/entities/case-law.entity.ts @@ -1,6 +1,10 @@ -import { ObjectType, Field, ID } from '@nestjs/graphql'; +import { ObjectType, Field, ID, registerEnumType } from '@nestjs/graphql'; import { GraphQLJSON } from 'graphql-type-json'; +import { CaseStatus } from '../../generated/prisma/client.js'; +registerEnumType(CaseStatus, { + name: 'CaseStatus', +}); @ObjectType() export class CaseLaw { @@ -34,9 +38,18 @@ export class CaseLaw { @Field() fileType: string; + @Field(() => [String], { defaultValue: [] }) + logs: string[]; + @Field(() => GraphQLJSON, { nullable: true }) metadata?: any; + @Field(() => CaseStatus) + status: CaseStatus; + + @Field({ nullable: true }) + processingError?: string; + @Field() createdAt: Date; diff --git a/src/cases/processors/case-queue.listener.ts b/src/cases/processors/case-queue.listener.ts index ebd7ae0..324c387 100644 --- a/src/cases/processors/case-queue.listener.ts +++ b/src/cases/processors/case-queue.listener.ts @@ -1,15 +1,15 @@ import { QueueEventsListener, QueueEventsHost, OnQueueEvent, InjectQueue } from '@nestjs/bullmq'; import { Queue } from 'bullmq'; -import { PrismaService } from '../../common/prisma/prisma.service'; -import { CaseStatus } from '@prisma/client'; -import { Logger } from '@nestjs/common'; +import { PRISMA_CLIENT, type PrismaClientInstance } from '../../common/prisma/prisma.service'; +import { CaseStatus } from '../../generated/prisma/client.js'; +import { Logger, Inject } from '@nestjs/common'; @QueueEventsListener('case-processing') export class CaseQueueListener extends QueueEventsHost { private readonly logger = new Logger(CaseQueueListener.name); constructor( - private prisma: PrismaService, + @Inject(PRISMA_CLIENT) private prisma: PrismaClientInstance, @InjectQueue('case-processing') private readonly queue: Queue, ) { super(); diff --git a/src/cases/processors/case.worker.ts b/src/cases/processors/case.worker.ts new file mode 100644 index 0000000..441557a --- /dev/null +++ b/src/cases/processors/case.worker.ts @@ -0,0 +1,26 @@ +import 'dotenv/config'; +import { Job } from 'bullmq'; +import axios from 'axios'; +import { ParserService } from '../parser/parser.service'; + +// I chose a sandboxed worker here (separate thread/process) because PDF parsing and +// AI calls can be surprisingly CPU heavy. If we did this on the main event loop, +// the whole API might lag while one person uploads a massive legal doc. +// This keeps the API snappy while the heavy lifting happens in the background. +export default async function (job: Job) { + const { downloadUrl, mimetype, caseId } = job.data; + try { + // Download + await job.updateProgress('📡 Downloading Case File from storage...'); + const response = await axios.get(downloadUrl, { responseType: 'arraybuffer' }); + const buffer = Buffer.from(response.data); + await job.updateProgress(`✅ File downloaded (${(buffer.length / 1024).toFixed(1)} KB)`); + + // Parse + await job.updateProgress('🔍 Extracting text content...'); + const result = await ParserService.parse(buffer, mimetype, job); + return result; + } catch (error) { + throw new Error(`Sandboxed parsing failed: ${error.message}`); + } +}