From f1cf8f8a88d198f2aee8782936c9bc161f7003e6 Mon Sep 17 00:00:00 2001 From: K Pamnany Date: Wed, 25 Oct 2023 15:43:50 -0400 Subject: [PATCH 1/3] Add GC-safe regions around some `ccall`s Specifically, around `LZ4_compress_fast`, `LZ4_compress_destSize` and `LZ4_decompress_safe`. If these are called on large data (on the order of GBs), the calls could take multiple seconds during which GC cannot run without these GC-safe regions. --- src/headers/lz4.jl | 31 ++++++++++++++++++++++++++++--- 1 file changed, 28 insertions(+), 3 deletions(-) diff --git a/src/headers/lz4.jl b/src/headers/lz4.jl index 374b3cd..76e96e1 100644 --- a/src/headers/lz4.jl +++ b/src/headers/lz4.jl @@ -46,7 +46,15 @@ This function never writes outside `dst` buffer, nor read outside `source` buffe Returns the number of bytes written into buffer `dst` (necessarily <= dstcapacity) """ function LZ4_compress_fast(src, dst, srcsize, dstcapacity, acceleration=1) - ret = ccall((:LZ4_compress_fast, liblz4), Cint, (Ptr{UInt8}, Ptr{UInt8}, Cint, Cint, Cint), src, dst, srcsize, dstcapacity, acceleration) + csrc = Base.cconvert(Ptr{UInt8}, src) + cdst = Base.cconvert(Ptr{UInt8}, dst) + GC.@preserve csrc cdst begin + # Allow Julia to GC while compressing + gc_state = @ccall(jl_gc_safe_enter()::Int8) + ret = ccall((:LZ4_compress_fast, liblz4), Cint, (Ptr{UInt8}, Ptr{UInt8}, Cint, Cint, Cint), Base.unsafe_convert(Ptr{UInt8}, csrc)::Ptr{UInt8}, Base.unsafe_convert(Ptr{UInt8}, cdst)::Ptr{UInt8}, srcsize, dstcapacity, acceleration) + # Leave GC-safe region, waiting for GC to complete if it's running + @ccall(jl_gc_safe_leave(gc_state::Int8)::Cvoid) + end check_compression_error(ret, "LZ4_compress_fast") end @@ -63,7 +71,16 @@ or fill `dst` buffer completely with as much data as possible from `src`. Returns number of bytes written into `dst` (necessarily <= dstcapacity) """ function LZ4_compress_destSize(src, dst, srcsize, dstcapacity) - ret = ccall((:LZ4_compress_destSize, liblz4), Cint, (Ptr{UInt8}, Ptr{UInt8}, Ptr{Cint}, Cint), src, dst, srcsize, dstcapacity) + csrc = Base.cconvert(Ptr{UInt8}, src) + cdst = Base.cconvert(Ptr{UInt8}, dst) + csrcsize = Base.cconvert(Ptr{Cint}, srcsize) + GC.@preserve csrc cdst csrcsize begin + # Allow Julia to GC while compressing + gc_state = @ccall(jl_gc_safe_enter()::Int8) + ret = ccall((:LZ4_compress_destSize, liblz4), Cint, (Ptr{UInt8}, Ptr{UInt8}, Ptr{Cint}, Cint), Base.unsafe_convert(Ptr{UInt8}, csrc)::Ptr{UInt8}, Base.unsafe_convert(Ptr{UInt8}, cdst)::Ptr{UInt8}, Base.unsafe_convert(Ptr{Cint}, csrcsize)::Ptr{Cint}, dstcapacity) + # Leave GC-safe region, waiting for GC to complete if it's running + @ccall(jl_gc_safe_leave(gc_state::Int8)::Cvoid) + end check_compression_error(ret, "LZ4_compress_destSize") end @@ -150,7 +167,15 @@ dstcapacity : is the size of destination buffer, which must be already allocated Returns the number of bytes decompressed into destination buffer (necessarily <= dstcapacity) """ function LZ4_decompress_safe(src, dst, cmpsize, dstcapacity) - ret = ccall((:LZ4_decompress_safe, liblz4), Cint, (Ptr{UInt8}, Ptr{UInt8}, Cint, Cint), src, dst, cmpsize, dstcapacity) + csrc = Base.cconvert(Ptr{UInt8}, src) + cdst = Base.cconvert(Ptr{UInt8}, dst) + GC.@preserve csrc cdst begin + # Allow Julia to GC while decompressing + gc_state = @ccall(jl_gc_safe_enter()::Int8) + ret = ccall((:LZ4_decompress_safe, liblz4), Cint, (Ptr{UInt8}, Ptr{UInt8}, Cint, Cint), Base.unsafe_convert(Ptr{UInt8}, csrc)::Ptr{UInt8}, Base.unsafe_convert(Ptr{UInt8}, cdst)::Ptr{UInt8}, cmpsize, dstcapacity) + # Leave GC-safe region, waiting for GC to complete if it's running + @ccall(jl_gc_safe_leave(gc_state::Int8)::Cvoid) + end check_decompression_error(ret, "LZ4_decompress_safe") end From db79d9c4c5a8b7ddbf800512d33b1f68dc69c8a4 Mon Sep 17 00:00:00 2001 From: K Pamnany Date: Thu, 2 Nov 2023 20:17:52 -0400 Subject: [PATCH 2/3] Preserve `out_buffer` in `process()` --- src/lz4_compression.jl | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/src/lz4_compression.jl b/src/lz4_compression.jl index f5064b3..cc68f76 100644 --- a/src/lz4_compression.jl +++ b/src/lz4_compression.jl @@ -154,18 +154,20 @@ function TranscodingStreams.process( out_buffer = Vector{UInt8}(undef, LZ4_compressBound(data_size)) unsafe_copyto!(in_buffer, input.ptr, data_size) - compressed_size = LZ4_compress_fast_continue( - codec.streamptr, - in_buffer, - pointer(out_buffer), - data_size, - length(out_buffer), - codec.acceleration, - ) - - checkbounds(output, compressed_size + CINT_SIZE) - writeint(output, compressed_size) - unsafe_copyto!(output.ptr + CINT_SIZE, pointer(out_buffer), compressed_size) + GC.@preserve out_buffer begin + compressed_size = LZ4_compress_fast_continue( + codec.streamptr, + in_buffer, + pointer(out_buffer), + data_size, + length(out_buffer), + codec.acceleration, + ) + + checkbounds(output, compressed_size + CINT_SIZE) + writeint(output, compressed_size) + unsafe_copyto!(output.ptr + CINT_SIZE, pointer(out_buffer), compressed_size) + end return (data_size, compressed_size + CINT_SIZE, :ok) catch err From b55c56d6c0a747695c3b3bb70e10993bdd05cc9a Mon Sep 17 00:00:00 2001 From: Jameson Nash Date: Thu, 1 Feb 2024 20:27:47 -0500 Subject: [PATCH 3/3] Update lz4.jl --- src/headers/lz4.jl | 39 ++++++++++++++++++++++++++------------- 1 file changed, 26 insertions(+), 13 deletions(-) diff --git a/src/headers/lz4.jl b/src/headers/lz4.jl index 76e96e1..7a95368 100644 --- a/src/headers/lz4.jl +++ b/src/headers/lz4.jl @@ -46,12 +46,17 @@ This function never writes outside `dst` buffer, nor read outside `source` buffe Returns the number of bytes written into buffer `dst` (necessarily <= dstcapacity) """ function LZ4_compress_fast(src, dst, srcsize, dstcapacity, acceleration=1) - csrc = Base.cconvert(Ptr{UInt8}, src) - cdst = Base.cconvert(Ptr{UInt8}, dst) - GC.@preserve csrc cdst begin + src = Base.cconvert(Ptr{UInt8}, src) + dst = Base.cconvert(Ptr{UInt8}, dst) + csrc = Base.unsafe_convert(Ptr{UInt8}, src)::Ptr{UInt8} + cdst = Base.unsafe_convert(Ptr{UInt8}, dst)::Ptr{UInt8} + srcsize = convert(Cint, srcsize)::Cint + dstcapacity = convert(Cint, dstcapacity)::Cint + acceleration = convert(Cint, acceleration)::Cint + GC.@preserve src dst begin # Allow Julia to GC while compressing gc_state = @ccall(jl_gc_safe_enter()::Int8) - ret = ccall((:LZ4_compress_fast, liblz4), Cint, (Ptr{UInt8}, Ptr{UInt8}, Cint, Cint, Cint), Base.unsafe_convert(Ptr{UInt8}, csrc)::Ptr{UInt8}, Base.unsafe_convert(Ptr{UInt8}, cdst)::Ptr{UInt8}, srcsize, dstcapacity, acceleration) + ret = ccall((:LZ4_compress_fast, liblz4), Cint, (Ptr{UInt8}, Ptr{UInt8}, Cint, Cint, Cint), csrc, cdst, srcsize, dstcapacity, acceleration) # Leave GC-safe region, waiting for GC to complete if it's running @ccall(jl_gc_safe_leave(gc_state::Int8)::Cvoid) end @@ -71,13 +76,17 @@ or fill `dst` buffer completely with as much data as possible from `src`. Returns number of bytes written into `dst` (necessarily <= dstcapacity) """ function LZ4_compress_destSize(src, dst, srcsize, dstcapacity) - csrc = Base.cconvert(Ptr{UInt8}, src) - cdst = Base.cconvert(Ptr{UInt8}, dst) - csrcsize = Base.cconvert(Ptr{Cint}, srcsize) - GC.@preserve csrc cdst csrcsize begin + src = Base.cconvert(Ptr{UInt8}, src) + dst = Base.cconvert(Ptr{UInt8}, dst) + srcsize = Base.cconvert(Ptr{Cint}, srcsize) + csrc = Base.unsafe_convert(Ptr{UInt8}, src)::Ptr{UInt8} + cdst = Base.unsafe_convert(Ptr{UInt8}, dst)::Ptr{UInt8} + csrcsize = Base.unsafe_convert(Ptr{Cint}, srcsize)::Ptr{Cint} + dstcapacity = convert(Cint, dstcapacity)::Cint + GC.@preserve src dst srcsize begin # Allow Julia to GC while compressing gc_state = @ccall(jl_gc_safe_enter()::Int8) - ret = ccall((:LZ4_compress_destSize, liblz4), Cint, (Ptr{UInt8}, Ptr{UInt8}, Ptr{Cint}, Cint), Base.unsafe_convert(Ptr{UInt8}, csrc)::Ptr{UInt8}, Base.unsafe_convert(Ptr{UInt8}, cdst)::Ptr{UInt8}, Base.unsafe_convert(Ptr{Cint}, csrcsize)::Ptr{Cint}, dstcapacity) + ret = ccall((:LZ4_compress_destSize, liblz4), Cint, (Ptr{UInt8}, Ptr{UInt8}, Ptr{Cint}, Cint), csrc, cdst, csrcsize, dstcapacity) # Leave GC-safe region, waiting for GC to complete if it's running @ccall(jl_gc_safe_leave(gc_state::Int8)::Cvoid) end @@ -167,12 +176,16 @@ dstcapacity : is the size of destination buffer, which must be already allocated Returns the number of bytes decompressed into destination buffer (necessarily <= dstcapacity) """ function LZ4_decompress_safe(src, dst, cmpsize, dstcapacity) - csrc = Base.cconvert(Ptr{UInt8}, src) - cdst = Base.cconvert(Ptr{UInt8}, dst) - GC.@preserve csrc cdst begin + src = Base.cconvert(Ptr{UInt8}, src) + dst = Base.cconvert(Ptr{UInt8}, dst) + csrc = Base.unsafe_convert(Ptr{UInt8}, src)::Ptr{UInt8} + cdst = Base.unsafe_convert(Ptr{UInt8}, dst)::Ptr{UInt8} + cmpsize = convert(Cint, cmpsize)::Cint + dstcapacity = convert(Cint, dstcapacity)::Cint + GC.@preserve src dst begin # Allow Julia to GC while decompressing gc_state = @ccall(jl_gc_safe_enter()::Int8) - ret = ccall((:LZ4_decompress_safe, liblz4), Cint, (Ptr{UInt8}, Ptr{UInt8}, Cint, Cint), Base.unsafe_convert(Ptr{UInt8}, csrc)::Ptr{UInt8}, Base.unsafe_convert(Ptr{UInt8}, cdst)::Ptr{UInt8}, cmpsize, dstcapacity) + ret = ccall((:LZ4_decompress_safe, liblz4), Cint, (Ptr{UInt8}, Ptr{UInt8}, Cint, Cint),csrc, cdst, cmpsize, dstcapacity) # Leave GC-safe region, waiting for GC to complete if it's running @ccall(jl_gc_safe_leave(gc_state::Int8)::Cvoid) end