Skip to content
This repository has been archived by the owner on Jun 17, 2024. It is now read-only.

Question: computation scheduling? #15

Open
Frank-Wu opened this issue Jul 29, 2014 · 6 comments
Open

Question: computation scheduling? #15

Frank-Wu opened this issue Jul 29, 2014 · 6 comments

Comments

@Frank-Wu
Copy link

Hi Naiad developers, the iterative model in Naiad is quite attractive. But I am wondering what if the previous round of computation keeps going but the new event comes in? For example, in connected component case, let's say edge <i, j> comes in and triggers some computation. Before the computation ends, edge <m, n> arrives. So will the edge <m, n> be buffered? Or the two rounds of computations can be performed simultaneously? Thanks!

@frankmcsherry
Copy link
Contributor

Hi Frank,

The edge will not be buffered. Multiple rounds of computation can occur simultaneously in Naiad.

However, it is important to note that this is not because a central scheduler decides that multiple rounds can be run concurrently. Naiad's execution model is a distributed dataflow model, in which records are sent as soon as they are available, and can be received at their destination at any point thereafter. What the recipient chooses to do with the record depends on its logic, and it may choose to buffer the input until, for example, it is sure it has received all records for a given timestamp (combination of epoch and loop counters). Or, it may choose to act on the record if it has enough information to produce a correct output.

As an example, the differential dataflow implementation of connected components distributed with Naiad has an iterative computation, and is fed by incremental inputs. If you supply the computation with more data before it has finished with its previous work, the two (or more) epoch of input will be processed concurrently. You can see this in the example (and probably cause it to crash horribly due to resource exhaustion) if you remove the Console.ReadLine(); and output.Sync(i + 1); lines from the program; it will introduce a substantial number of differences without waiting for Naiad to complete any of them. To avoid crashing and see the behavior, you should be able to replace output.Sync(i + 1); with output.Sync(i - 10); to allow 10 or 11 epochs to be in flight at any moment.

Hope this helps,
Frank

@Frank-Wu
Copy link
Author

Hi Frank,

Your answer really helps a lot! In order to get a better understanding, let's look at a connected component example.

If the first edge comes in, then it triggers a series of computation and generates a row in the lattice: <1,1>, <1,2>,<1,3>,<1,4>... Likewise, once the second edge comes in, a row of lattice is generated: <2,1>, <2,2>,<2,3>,<2,4>... According to the technical report, it seems that the computation at time <2,2> depends on the computation at time <1,1>, <1,2>, and <2,1>. So my question is, whether the computation at time <2,2>,<2,3>,<2,4>... can be triggered if the computation at time <1,2> remains unfinished? If the computation can be triggered, then it may lead to duplicated computation. But if not, then it seems that a synchronization barrier is required to order the computation, correct?

I hope I have already made the problem clear. The lattice I referred to can be found in Figure 14 of the technical report. Thanks!

Regards,
Frank

@frankmcsherry
Copy link
Contributor

Hi Frank,

You are right that because (2,2) and (2,3) and (2,4) depend on the value at (1,2), differential dataflow will not produce outputs until that information is available. However, this is a choice of the implementation of differential dataflow, rather than something enforced by Naiad. We used to have a version of differential dataflow without synchronization which would proceed even without the fully correct values at (1,2), under the premise that most of the work would be correct, and fixing it up would be faster, but for many reasons (performance, correctness, crashing) this ends up being pretty unsafe.

From a Naiad point of view, there are two things the system can do for a vertex:

  1. It can call OnReceive with additional data.
  2. It can call OnNotify with a timestamp it is sure is complete.

Naiad will happily deliver messages with timestamps (2,4) even if there are still outstanding (1,2) messages. If a vertex wants to take action, it can (and many vertices, for example Select, Where, Concat, and Except, ... can, because they are stateless). If the vertex would rather buffer the message internally until it is signaled that prior work is done, it can do this too.

Naiad will not deliver OnNotify messages for (2,2), (2,3), or (2,4) while records for (1,2) exist. Once (1,2) and other times are cleaned up, it will deliver notifications for (2,2) which allows the implementation of barrier semantics, however the system itself does not experience a synchronization barrier (other parts of the dataflow graph happily continue computation and deliver messages without interruption).

Cheers,
Frank

@Frank-Wu
Copy link
Author

Hi Frank,

Thanks for your explanation. I think I have understood how Naiad works!

For the connected component example, a message should be buffered until all its dependent message have arrived (although such buffering is not really necessary and can be eliminated by taking certain risks).

So I think we can implement the connected component example as follows:

Class ConnectedComponent : Vertex
{
    void OnRecv(Edge e, S msg, T time)
    {
        //record the edge here
        ...
        //buffering
        this.notifyAt(time);
    }

    //wait until every dependent event has arrived
    //e.g., Notify message (2,2) once message (1,1),(1,2),(2,1) have arrived
    void OnNotify(T time)
    {
        //do real computation here
        ....
    }
}

The OnNotify() function can guarantee all dependent messages have arrived. The ingress vertex and egress vertex must be centralized in order to guarantee the message ordering.

Please correct me if there's something wrong, thanks!

Regards,
Frank

@frankmcsherry
Copy link
Contributor

This looks correct. The differential dataflow vertex implementations are based off of the UnaryBufferingVertex and BinaryBufferingVertex types (in the Naiad.Dataflow.StandardVertices namespace), which follow this pattern.

@Frank-Wu
Copy link
Author

Hi Frank,

Thanks for your kind help! The Naiad model is pretty nice and I will keep exploring more about it! Thanks!

Regards,
Frank

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants