From c3e296e334b0be3f2da2e9bb92f01aca492b45f7 Mon Sep 17 00:00:00 2001 From: blperf <> Date: Sun, 29 Oct 2023 18:42:30 -0400 Subject: [PATCH] Adding initial fix for bug issue #1967 --- .../generated/ExpressMiddlewareFactory.ts | 6 ++-- src/blob/generated/handlers/IBlobHandler.ts | 1 + src/blob/handlers/BlobHandler.ts | 9 +++++ src/common/persistence/FSExtentStore.ts | 33 +++++++++++++++++++ src/common/persistence/IExtentStore.ts | 5 +++ 5 files changed, 52 insertions(+), 2 deletions(-) diff --git a/src/blob/generated/ExpressMiddlewareFactory.ts b/src/blob/generated/ExpressMiddlewareFactory.ts index 8ece08df0..08fb277c0 100644 --- a/src/blob/generated/ExpressMiddlewareFactory.ts +++ b/src/blob/generated/ExpressMiddlewareFactory.ts @@ -92,13 +92,15 @@ export default class ExpressMiddlewareFactory extends MiddlewareFactory { handlers, this.logger ); - return (req: Request, res: Response, next: NextFunction) => { + return (req: Request, res: Response, next: NextFunction) => { const request = new ExpressRequestAdapter(req); const response = new ExpressResponseAdapter(res); + let newContext = new Context(res.locals, this.contextPath, request, response); handlerMiddlewareFactory.createHandlerMiddleware()( - new Context(res.locals, this.contextPath, request, response), + newContext, next ); + res.on("close", () => handlers.blobHandler.cleanUpBlob(newContext)); }; } diff --git a/src/blob/generated/handlers/IBlobHandler.ts b/src/blob/generated/handlers/IBlobHandler.ts index da348f143..e825047a3 100644 --- a/src/blob/generated/handlers/IBlobHandler.ts +++ b/src/blob/generated/handlers/IBlobHandler.ts @@ -14,6 +14,7 @@ import Context from "../Context"; export default interface IBlobHandler { download(options: Models.BlobDownloadOptionalParams, context: Context): Promise; + cleanUpBlob(context: Context): Promise; getProperties(options: Models.BlobGetPropertiesOptionalParams, context: Context): Promise; delete(options: Models.BlobDeleteMethodOptionalParams, context: Context): Promise; undelete(options: Models.BlobUndeleteOptionalParams, context: Context): Promise; diff --git a/src/blob/handlers/BlobHandler.ts b/src/blob/handlers/BlobHandler.ts index b9da01ae2..ecfe4dca5 100644 --- a/src/blob/handlers/BlobHandler.ts +++ b/src/blob/handlers/BlobHandler.ts @@ -91,6 +91,15 @@ export default class BlobHandler extends BaseHandler implements IBlobHandler { } } + /** + * Clean up file handles for blob to prevent leak. + * + * @memberof BlobHandler + */ + public async cleanUpBlob(context: Context): Promise { + this.extentStore.cleanStreams(context.contextId); + } + /** * Get blob properties. * diff --git a/src/common/persistence/FSExtentStore.ts b/src/common/persistence/FSExtentStore.ts index 199a8012d..2fc75e847 100644 --- a/src/common/persistence/FSExtentStore.ts +++ b/src/common/persistence/FSExtentStore.ts @@ -5,6 +5,7 @@ import { fdatasync, mkdir, open, + ReadStream, stat, unlink } from "fs"; @@ -304,6 +305,8 @@ export default class FSExtentStore implements IExtentStore { return this.appendQueue.operate(op, contextId); } + private streams: Map = new Map(); + /** * Read data from persistency layer according to the given IExtentChunk. * @@ -353,12 +356,42 @@ export default class FSExtentStore implements IExtentStore { contextId ); }); + + if (contextId != null) { + let existingStreams = this.streams.get(contextId); + if (existingStreams == null) { + let newStreamsArray: ReadStream[] = [stream]; + this.streams.set(contextId, newStreamsArray); + } + else { + existingStreams.push(stream); + this.streams.set(contextId, existingStreams); + } + } + resolve(stream); }); return this.readQueue.operate(op, contextId); } + public async cleanStreams(contextId?: string): Promise { + this.logger.verbose( + "FSExtentStore:cleanStreams() Response object closed unexpectedly, cleaning up after streams", + contextId + ); + + if (contextId != null) { + let streamsToCleanup = this.streams.get(contextId); + + if (streamsToCleanup != null) { + for (const stream of streamsToCleanup) { + stream.destroy(); + } + } + } + } + /** * Merge several extent chunks to a ReadableStream according to the offset and count. * diff --git a/src/common/persistence/IExtentStore.ts b/src/common/persistence/IExtentStore.ts index a7de1a0f1..16d512049 100644 --- a/src/common/persistence/IExtentStore.ts +++ b/src/common/persistence/IExtentStore.ts @@ -59,6 +59,11 @@ export default interface IExtentStore extends IDataStore, ICleaner { contextId?: string ): Promise; + /** + * Clean up file handles from an extent when a response object is closed + */ + cleanStreams(contextId?: string): Promise; + /** * Merge several extent chunks to a ReadableStream according to the offset and count. *