Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scala implicit ordering fun -- have 2 forms of sketch, defaulting to … #1535

Open
wants to merge 4 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -428,13 +428,13 @@ abstract class FixedPathSource(path: String*) extends FileSource {
override def equals(that: Any): Boolean = (that != null) && (that.toString == toString)

/**
* Similar in behavior to {@link TimePathedSource.writePathFor}.
* Strip out the trailing slash star.
*/
* Similar in behavior to {@link TimePathedSource.writePathFor}.
* Strip out the trailing slash star.
*/
protected def stripTrailing(path: String): String = {
assert(path != "*", "Path must not be *")
assert(path != "/*", "Path must not be /*")
if(path.takeRight(2) == "/*") {
if (path.takeRight(2) == "/*") {
path.dropRight(2)
} else {
path
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import com.twitter.algebird.{ Aggregator, Monoid, Semigroup }
import com.twitter.scalding.TupleConverter.{ TupleEntryConverter, singleConverter, tuple2Converter }
import com.twitter.scalding.TupleSetter.{ singleSetter, tup2Setter }
import com.twitter.scalding._
import com.twitter.scalding.serialization.OrderedSerialization
import com.twitter.scalding.serialization.{ OrderedSerialization, Serialization }
import com.twitter.scalding.serialization.OrderedSerialization.Result
import com.twitter.scalding.serialization.macros.impl.BinaryOrdering
import com.twitter.scalding.serialization.macros.impl.BinaryOrdering._
Expand All @@ -38,7 +38,7 @@ import scala.util.Try
* This object is here rather than in the typed package because a lot of code was written using
* the functions in the object, which we do not see how to hide with package object tricks.
*/
object TypedPipe extends Serializable {
object TypedPipe extends Serializable with LowerPriorityTypedPipeEnrichements {
import Dsl.flowDefToRichFlowDef

/**
Expand Down Expand Up @@ -119,6 +119,41 @@ object TypedPipe extends Serializable {
override def hash(x: Int): Int = x
}
}
/**
* Enables joining when this TypedPipe has some keys with many many values and
* but many with very few values. For instance, a graph where some nodes have
* millions of neighbors, but most have only a few.
*
* We build a (count-min) sketch of each key's frequency, and we use that
* to shard the heavy keys across many reducers.
* This increases communication cost in order to reduce the maximum time needed
* to complete the join.
*
* {@code pipe.sketch(100).join(thatPipe) }
* will add an extra map/reduce job over a standard join to create the count-min-sketch.
* This will generally only be beneficial if you have really heavy skew, where without
* this you have 1 or 2 reducers taking hours longer than the rest.
*/

implicit def toOrderedSerializionSketchMethod[K, V](tp: TypedPipe[(K, V)])(implicit orderedSerialization: OrderedSerialization[K]) = new {
def sketch(reducers: Int,
eps: Double = 1.0E-5, //272k width = 1MB per row
delta: Double = 0.01, //5 rows (= 5 hashes)
seed: Int = 12345): Sketched[K, V] =
Sketched(tp, reducers, delta, eps, seed)(Serialization.toBytes[K](_), orderedSerialization)
}

}

trait LowerPriorityTypedPipeEnrichements {
implicit def toOrderingPlusFnSketchMethod[K, V](tp: TypedPipe[(K, V)]) = new {
def sketch(reducers: Int,
eps: Double = 1.0E-5, //272k width = 1MB per row
delta: Double = 0.01, //5 rows (= 5 hashes)
seed: Int = 12345)(implicit serialization: K => Array[Byte],
ordering: Ordering[K]): Sketched[K, V] =
Sketched(tp, reducers, delta, eps, seed)
}
}

/**
Expand Down Expand Up @@ -690,29 +725,6 @@ trait TypedPipe[+T] extends Serializable {
.hashLeftJoin(grouped)
.map { case (t, (_, optV)) => (t, optV) }

/**
* Enables joining when this TypedPipe has some keys with many many values and
* but many with very few values. For instance, a graph where some nodes have
* millions of neighbors, but most have only a few.
*
* We build a (count-min) sketch of each key's frequency, and we use that
* to shard the heavy keys across many reducers.
* This increases communication cost in order to reduce the maximum time needed
* to complete the join.
*
* {@code pipe.sketch(100).join(thatPipe) }
* will add an extra map/reduce job over a standard join to create the count-min-sketch.
* This will generally only be beneficial if you have really heavy skew, where without
* this you have 1 or 2 reducers taking hours longer than the rest.
*/
def sketch[K, V](reducers: Int,
eps: Double = 1.0E-5, //272k width = 1MB per row
delta: Double = 0.01, //5 rows (= 5 hashes)
seed: Int = 12345)(implicit ev: TypedPipe[T] <:< TypedPipe[(K, V)],
serialization: K => Array[Byte],
ordering: Ordering[K]): Sketched[K, V] =
Sketched(ev(this), reducers, delta, eps, seed)

/**
* If any errors happen below this line, but before a groupBy, write to a TypedSink
*/
Expand Down
113 changes: 113 additions & 0 deletions scalding-core/src/test/scala/com/twitter/scalding/TypedPipeTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1348,6 +1348,25 @@ class TypedSketchJoinJob(args: Args) extends Job(args) {
.write(TypedText.tsv[(Int, Int, Int)]("output-join"))
}

class OrderedSerializationTypedSketchJoinJob(args: Args) extends Job(args) {
val zero = TypedPipe.from(TypedText.tsv[(Int, Int)]("input0"))
val one = TypedPipe.from(TypedText.tsv[(Int, Int)]("input1"))

import com.twitter.scalding.serialization.macros.impl.BinaryOrdering._

zero
.sketch(args("reducers").toInt)
.join(one)
.map{ case (k, (v0, v1)) => (k, v0, v1) }
.write(TypedText.tsv[(Int, Int, Int)]("output-sketch"))

zero
.group
.join(one.group)
.map{ case (k, (v0, v1)) => (k, v0, v1) }
.write(TypedText.tsv[(Int, Int, Int)]("output-join"))
}

class TypedSketchLeftJoinJob(args: Args) extends Job(args) {
val zero = TypedPipe.from(TypedText.tsv[(Int, Int)]("input0"))
val one = TypedPipe.from(TypedText.tsv[(Int, Int)]("input1"))
Expand All @@ -1367,6 +1386,46 @@ class TypedSketchLeftJoinJob(args: Args) extends Job(args) {
.write(TypedText.tsv[(Int, Int, Int)]("output-join"))
}

class OrderedSerializationTypedSketchLeftJoinJob(args: Args) extends Job(args) {
val zero = TypedPipe.from(TypedText.tsv[(Int, Int)]("input0"))
val one = TypedPipe.from(TypedText.tsv[(Int, Int)]("input1"))

import com.twitter.scalding.serialization.macros.impl.BinaryOrdering._

zero
.sketch(args("reducers").toInt)
.leftJoin(one)
.map{ case (k, (v0, v1)) => (k, v0, v1.getOrElse(-1)) }
.write(TypedText.tsv[(Int, Int, Int)]("output-sketch"))

zero
.group
.leftJoin(one.group)
.map{ case (k, (v0, v1)) => (k, v0, v1.getOrElse(-1)) }
.write(TypedText.tsv[(Int, Int, Int)]("output-join"))
}

class BothAvailableSketchMethodsTypedSketchLeftJoinJob(args: Args) extends Job(args) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This job looks unused in the test specs below.

val zero = TypedPipe.from(TypedText.tsv[(Int, Int)]("input0"))
val one = TypedPipe.from(TypedText.tsv[(Int, Int)]("input1"))

import com.twitter.scalding.serialization.macros.impl.BinaryOrdering._
implicit def serialize(k: Int) = k.toString.getBytes

zero
.sketch(args("reducers").toInt)
.leftJoin(one)
.map{ case (k, (v0, v1)) => (k, v0, v1.getOrElse(-1)) }
.write(TypedText.tsv[(Int, Int, Int)]("output-sketch"))

zero
.group
.leftJoin(one.group)
.map{ case (k, (v0, v1)) => (k, v0, v1.getOrElse(-1)) }
.write(TypedText.tsv[(Int, Int, Int)]("output-join"))
}


object TypedSketchJoinTestHelper {
import Dsl._

Expand Down Expand Up @@ -1428,6 +1487,33 @@ class TypedSketchJoinJobTest extends WordSpec with Matchers {
sk shouldBe inner
}
}

"A TypedSketchJoinJob using OrderedSerialization" should {
"get the same result as an inner join" in {
val (sk, inner) = runJobWithArguments(new OrderedSerializationTypedSketchJoinJob(_), 10, x => 1)
sk shouldBe inner
}

"get the same result when half the left keys are missing" in {
val (sk, inner) = runJobWithArguments(new OrderedSerializationTypedSketchJoinJob(_), 10, x => if (x < 50) 0 else 1)
sk shouldBe inner
}

"get the same result with a massive skew to one key" in {
val (sk, inner) = runJobWithArguments(new OrderedSerializationTypedSketchJoinJob(_), 10, x => if (x == 50) 1000 else 1)
sk shouldBe inner
}

"still work with only one reducer" in {
val (sk, inner) = runJobWithArguments(new OrderedSerializationTypedSketchJoinJob(_), 1, x => 1)
sk shouldBe inner
}

"still work with massive skew and only one reducer" in {
val (sk, inner) = runJobWithArguments(new OrderedSerializationTypedSketchJoinJob(_), 1, x => if (x == 50) 1000 else 1)
sk shouldBe inner
}
}
}

class TypedSketchLeftJoinJobTest extends WordSpec with Matchers {
Expand Down Expand Up @@ -1460,4 +1546,31 @@ class TypedSketchLeftJoinJobTest extends WordSpec with Matchers {
sk shouldBe inner
}
}

"A OrderedSerialization TypedSketchLeftJoinJob" should {
"get the same result as a left join" in {
val (sk, left) = runJobWithArguments(new OrderedSerializationTypedSketchLeftJoinJob(_), 10, x => 1)
sk shouldBe left
}

"get the same result when half the left keys are missing" in {
val (sk, inner) = runJobWithArguments(new OrderedSerializationTypedSketchLeftJoinJob(_), 10, x => if (x < 50) 0 else 1)
sk shouldBe inner
}

"get the same result with a massive skew to one key" in {
val (sk, inner) = runJobWithArguments(new OrderedSerializationTypedSketchLeftJoinJob(_), 10, x => if (x == 50) 1000 else 1)
sk shouldBe inner
}

"still work with only one reducer" in {
val (sk, inner) = runJobWithArguments(new OrderedSerializationTypedSketchLeftJoinJob(_), 1, x => 1)
sk shouldBe inner
}

"still work with massive skew and only one reducer" in {
val (sk, inner) = runJobWithArguments(new OrderedSerializationTypedSketchLeftJoinJob(_), 1, x => if (x == 50) 1000 else 1)
sk shouldBe inner
}
}
}