-
Notifications
You must be signed in to change notification settings - Fork 708
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] PullByteBufferOut as a default ordering #1728
base: develop
Are you sure you want to change the base?
[WIP] PullByteBufferOut as a default ordering #1728
Conversation
@@ -0,0 +1,18 @@ | |||
/* | |||
Copyright 2015 Twitter, Inc. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we skip the bogus copyright?
- year is wrong
- stripe is maybe paying you to write this code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah i was going to ask you if you thought we need to put the licence and/or copyright in these at all
*/ | ||
package com.twitter.scalding.serialization | ||
|
||
case class Exported[T](instance: T) extends AnyVal |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand this pattern. Can you document it or link to an explanation? I know @travisbrown likes it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The general gist is just that by supplying instances of Exported
you can import an object/package supplying them and inject them at a lower priority than a normal import.
So here if you import the default object it would just supply low priority implicits so won't override a user supplied one.
https://github.com/milessabin/export-hook
(i can add a link in the code too)
import com.twitter.scalding.serialization.{OrderedSerialization, DefaultOrderedSerialization} | ||
import scala.reflect.macros.whitebox | ||
|
||
class ExportMacros(val c: whitebox.Context) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are we actually calling this anywhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope, i think its killable/not sure if adds something. Was discussing that with @travisbrown towards EOD here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right now the only exported instance has a concrete type, but once you have generically derived ones the macro here will be necessary (although it turns out it doesn't need to be whitebox, I believe). I've got some notes on this from when I was prepping this talk—I'll try to find them.
if (staticSize.isEmpty) | ||
in.readPosVarInt | ||
|
||
_root_.scala.util.Success(unsafeRead(in)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the _root_
just an artifact of a previous macro context?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, i've cleaned these up i think so it should be more sane now
import com.twitter.scalding.serialization.{OrderedSerialization, DefaultOrderedSerialization} | ||
import scala.reflect.macros.whitebox | ||
|
||
class ExportMacros(val c: whitebox.Context) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right now the only exported instance has a concrete type, but once you have generically derived ones the macro here will be necessary (although it turns out it doesn't need to be whitebox, I believe). I've got some notes on this from when I was prepping this talk—I'll try to find them.
@@ -510,7 +512,7 @@ class MacroOrderingProperties | |||
} | |||
|
|||
test("Test out ByteBuffer") { | |||
BinaryOrdering.ordSer[ByteBuffer] | |||
implicitly[OrderedSerialization[ByteBuffer]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It'd be nice to have a test confirming that an explicit instance doesn't get overridden by the DefaultOrderedSerialization
import.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, did a test with a companion object defined ordered serialization not being overridden
Ok I think this is looking a bit more cleaned up and may be worth looking at again design wise. I'd be slow to merge this as-is if we think develop should always be ready to released since i think for a release we would want to get everything from the macro out and under provided split up. It would be binary breaking since today a single method can be used to do everything. (Though we could possibly inject a different/new whitebox macro to smooth over most of this... but i'm not sure its worth that vs just moving onto a nicer split up world). There is also a pre-release question here if the old school macros should be under the same package/import as say our ByteBuffer implementations. This would make it harder to shop around and use the ByteBuffer implementation, but then also use Shapeless. |
private[this] def noLengthWrite(element: T, outerOutputStream: OutputStream): Unit = { | ||
// Start with pretty big buffers because reallocation will be expensive | ||
val baos = new ByteArrayOutputStream(512) | ||
unsafeWrite(baos, element) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this implies that unsafeWrite
means no size. Can we add that to the comments below?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It implies no outer size, it can be a bit wasteful. I'm adding a comment to the definition of this method that I hope will help here a little:
// This will write out the interior data as a blob with no prepended length
// This means binary compare cannot skip on this data.
// However the contract remains that one should be able to _read_ the data
// back out again.
def unsafeWrite(out: java.io.OutputStream, t: T): Unit
|
||
trait HasUnsafeCompareBinary[T] extends OrderedSerialization[T] { | ||
def unsafeCompareBinary(inputStreamA: InputStream, inputStreamB: InputStream): Int | ||
def unsafeWrite(out: java.io.OutputStream, t: T): Unit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we drop java.io
? Also, I think this has to be the unsized output if there is ever a size header added, right? It seems confusing above since it is used that way, but it is not clear.
It may or may not be sized? Can you add some laws about how to reason about these things?
def unsafeCompareBinary(inputStreamA: InputStream, inputStreamB: InputStream): Int | ||
def unsafeWrite(out: java.io.OutputStream, t: T): Unit | ||
def unsafeRead(in: java.io.InputStream): T | ||
def unsafeSize(t: T): Option[Int] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is the contract here? Again similar concerns as above.
// Members declared in com.twitter.scalding.serialization.Serialization | ||
def read(in: java.io.InputStream): scala.util.Try[T] = o.read(in) | ||
def staticSize: Option[Int] = o.staticSize | ||
def unsafeWrite(out: java.io.OutputStream, t: T): Unit = o.write(out, t).get |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here, unsafeWrite
could have a size if the original did, but then I guess you could add two sizes, couldn't you (since above we might call noLengthWrite
)?
} | ||
|
||
def unsafeRead(inputStream: java.io.InputStream): ByteBuffer = { | ||
val lenA = inputStream.readPosVarInt |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we write an additional length header on this thing currently?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep we always write a length header for ByteBuffers in the existing master
https://github.com/twitter/scalding/blob/develop/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/macros/impl/ordered_serialization/providers/ByteBufferOrderedBuf.scala#L82
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about this: can you update the PR to include 3 hand-written combinators:
Tuple2OrderedSerialization
EitherOrderedSerialization
and ListOrderedSerialization
. I think if we can do those three (static-sized product, static-sized sum/union, dynamic-sized product) I think we will see what methods we want to have to enable.
You can tell something is wrong with OrderedSerialization because in our current tuple2 we have no way to avoid deserializing the second part.
I think if we exercise your code in the same PR that does those three we will be able to see more clearly if we have the API improved or not, or if we are still missing something.
@@ -53,6 +54,11 @@ trait Serialization[T] extends Equiv[T] with Hashing[T] with Serializable { | |||
* otherwise the caller should just serialize into an ByteArrayOutputStream | |||
*/ | |||
def dynamicSize(t: T): Option[Int] | |||
|
|||
// Override this to provide more efficient | |||
def skip(in: InputStream): Try[Unit] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we may want def skip(count: Int, in: InputStream): Try[Unit]
so in say List[T]
we can skip the rest of the collection. If count <= 0
do nothing, and otherwise in the worst case just read and throw them away as you do below.
* This compares two InputStreams. After this call, the position in | ||
* the InputStreams may or may not be at the end of the record. | ||
*/ | ||
def compareBinaryNoConsume(a: InputStream, b: InputStream): OrderedSerialization.Result = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I get really worried about how to compose methods like this that lack a strong contract. Also, I don't see that we ever call this.
Failure(e) | ||
} | ||
|
||
override def compareBinaryNoConsume(inputStreamA: InputStream, inputStreamB: InputStream): OrderedSerialization.Result = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be final?
OrderedSerialization.CompareFailure(e) | ||
} | ||
|
||
override def compareBinary(inputStreamA: InputStream, inputStreamB: InputStream): OrderedSerialization.Result = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can this be final?
First extraction.
Right now i don't think we need the macros and can probably drop them again.
Naming/code organization and how this should all get imported open to opinions
We could have the defaults show up in the package object ordered serialization itself?
thoughts ? @johnynek @travisbrown