[REEF-2054] Elastic group communication: broadcast#1487
[REEF-2054] Elastic group communication: broadcast#1487interesaaat wants to merge 29 commits intoapache:masterfrom
Conversation
motus
left a comment
There was a problem hiding this comment.
OK, I think I'll start it slow. I'll bundle my comments into groups of 20 or so, and keep reviewing as you push the updates. This way we can work in parallel.
lang/cs/Org.Apache.REEF.Network.Examples.Client/ElasticBroadcastClient.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network.Examples.Client/ElasticBroadcastClient.cs
Outdated
Show resolved
Hide resolved
| if (testToRun.Equals("ElasticBroadcast".ToLower()) || testToRun.Equals("all")) | ||
| { | ||
| new ElasticBroadcastClient(runOnYarn, numNodes, startPort, portRange); | ||
| Console.WriteLine("ElasticRunBroadcast completed!!!"); |
There was a problem hiding this comment.
Shouldn't we write it to the log instead?
There was a problem hiding this comment.
Should we? This is the client, I see that often we print directly to console, no?
lang/cs/Org.Apache.REEF.Network.Examples/Elastic/BroadcastMasterTask.cs
Outdated
Show resolved
Hide resolved
| for (int pos = _position; pos < _operators.Count; pos++) | ||
| { | ||
| _operators[pos].ResetPosition(); | ||
| } |
There was a problem hiding this comment.
why not just
foreach (var op in _operators)
{
op.ResetPosition();
}There was a problem hiding this comment.
This is tricky. When you have iterators in the pipeline you only reset each time within the context of an iteration. The for loop is used because the position tells where to actually start to reset.
There was a problem hiding this comment.
oh, I see. I have not noticed that it starts from the _position. Then, are we sure that it never starts from -1?
There was a problem hiding this comment.
Yes. ResetOperatorPositions is private and only used inside MoveNext where the first operation is _position++.
| // Check if we need to iterate | ||
| if (_iteratorsPosition.Count > 0 && _position == _iteratorsPosition[0]) | ||
| { | ||
| var iteratorOperator = _operators[_position] as IElasticIterator; |
There was a problem hiding this comment.
I don't like that cast. Can we enforce that _operators is a list of IElasticIterator elements instead?
There was a problem hiding this comment.
Not really because _operators contains the sequence of all operators composing the workflow. I could make all operators be an IElasticIterator of size 0, but this is less clean than using a cast I believe. I am open for suggestions tho.
There was a problem hiding this comment.
I see. I actually would not mind having an empty IElasticIterator implementation, but it is up to you here.
P.S. Note that we don't have to implement the actual iterator: IElasticOperator can just inherit from IEnumerable and return an empty sequence from .GetEnumerator() by default.
There was a problem hiding this comment.
What about this:
IElasticIterator iteratorOperator;
switch(_operators[_position])
{
case IElasticIterator iter:
iteratorOperator = iter;
break;
default:
throw new IllegalStateException("Operator not Iterator");
break;
}|
Ready for the second round! |
motus
left a comment
There was a problem hiding this comment.
Few very minor comments for today. Check for naming conventions and make sure you use {0}-style string interpolation in logging instead of string concatenation or $"". (Note that for exceptions it's the other way around)
lang/cs/Org.Apache.REEF.Network.Examples/Elastic/BroadcastMasterTask.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network.Examples/Elastic/BroadcastMasterTask.cs
Outdated
Show resolved
Hide resolved
motus
left a comment
There was a problem hiding this comment.
A few more substantial comments this time. I think we can shorten the code quite a bit
...Apache.REEF.Network.Examples.Client/ElasticBroadcastClientWithFailEvaluatorBeforeWorkflow.cs
Outdated
Show resolved
Hide resolved
...Apache.REEF.Network.Examples.Client/ElasticBroadcastClientWithFailEvaluatorBeforeWorkflow.cs
Outdated
Show resolved
Hide resolved
|
Few very minor comments for today. Check for naming conventions and make
> sure you use {0}-style string interpolation in logging instead of string
> concatenation or $"". (Note that for exceptions it's the other way
> around)
Can you please explain to me the intuition behind this?
Sure! In case of explicit string concatenation or $"..." - style
interpolation, the entire string is evaluated *before* the method
invocation. This is exactly what we need for exceptions and most of other
methods and constructors as well.
For logging, the situation is different, as we don't know if the string
will be used at all. If the log level is higher than the priority of the
current log message, the logging call is basically a no-op, and the string
parameter will be ignored. That is, if we build a string *before* the
logging call, it can be a big waste, especially if the log message is long
and expensive to construct. This is why for logging we want to perform
string interpolation *inside* the logging method, so the logger can do it
only when it knows for sure that we need to log that string. Sometimes we
even wrap the logging call into Logger.IsLoggable() check, if interpolation
at the logger level is not possible.
Internal logging string interpolation uses {0}-style placeholders, hence
the rule: use {0} style for logging, $"..." for everything else.
Hope that helps! :)
Cheers,
Sergiy.
|
- Fixed some line length< 120 - Fixed some var initilization in constructors
motus
left a comment
There was a problem hiding this comment.
Added some more comments. will continue tomorrow
.../Org.Apache.REEF.Network.Examples/Elastic/WithFailures/ElasticBroadcastDriverWithFailures.cs
Show resolved
Hide resolved
.../Org.Apache.REEF.Network.Examples/Elastic/WithFailures/ElasticBroadcastDriverWithFailures.cs
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Failures/IFailureStateMachine.cs
Outdated
Show resolved
Hide resolved
.../Org.Apache.REEF.Network.Examples/Elastic/WithFailures/ElasticBroadcastDriverWithFailures.cs
Outdated
Show resolved
Hide resolved
Fixed API for generic new threshold for the failre machine.
.../Org.Apache.REEF.Network.Examples/Elastic/WithFailures/ElasticBroadcastDriverWithFailures.cs
Outdated
Show resolved
Hide resolved
Added params for better set of threshould in the failure machine
motus
left a comment
There was a problem hiding this comment.
left a few minor comments. Will add more soon!
lang/cs/Org.Apache.REEF.Network.Examples/Elastic/ElasticBroadcastDriver.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network.Examples/Elastic/ElasticBroadcastDriver.cs
Show resolved
Hide resolved
| { | ||
| NumOfDataPoints = initalPoints; | ||
| NumOfFailedDataPoints = initalPoints; | ||
| State = new DefaultFailureState((int)initalState); |
There was a problem hiding this comment.
we can define constructor
public DefaultFailureState(DefaultFailureStates state)to avoid the explicit cast here
There was a problem hiding this comment.
Mmm not sure. Because I will have to do a cast later on because FailureState is an int (cannot be a DefaultFailureState because it's defined in the base class).
There was a problem hiding this comment.
Yeah we have to discuss that. I still cannot think of something elegant.
lang/cs/Org.Apache.REEF.Network/Elastic/Failures/Default/DefaultFailureStateMachine.cs
Outdated
Show resolved
Hide resolved
motus
left a comment
There was a problem hiding this comment.
Another round of comments - will add more tomorrow!
| for (int pos = _position; pos < _operators.Count; pos++) | ||
| { | ||
| _operators[pos].ResetPosition(); | ||
| } |
There was a problem hiding this comment.
oh, I see. I have not noticed that it starts from the _position. Then, are we sure that it never starts from -1?
lang/cs/Org.Apache.REEF.Network/Elastic/Operators/Physical/Default/DefaultOneToN.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Operators/Physical/Default/DefaultOneToN.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Operators/Physical/Default/DefaultOneToN.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Operators/Physical/Default/DefaultOneToN.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Operators/Physical/IElasticIterator.cs
Outdated
Show resolved
Hide resolved
motus
left a comment
There was a problem hiding this comment.
added a few more comments. To be continued!
lang/cs/Org.Apache.REEF.Network/Elastic/Task/CancellationSource.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Task/CancellationSource.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Task/CommunicationLayer.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Task/CommunicationLayer.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Task/CommunicationLayer.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Task/CommunicationLayer.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Topology/Physical/Default/DefaultBroadcastTopology.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Topology/Physical/Default/DefaultBroadcastTopology.cs
Outdated
Show resolved
Hide resolved
...e.REEF.Network/Elastic/Topology/Physical/Default/OperatorTopologyWithDefaultCommunication.cs
Outdated
Show resolved
Hide resolved
...e.REEF.Network/Elastic/Topology/Physical/Default/OperatorTopologyWithDefaultCommunication.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Task/CommunicationLayer.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Task/CommunicationLayer.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Task/CommunicationLayer.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Task/CommunicationLayer.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Task/CommunicationLayer.cs
Outdated
Show resolved
Hide resolved
motus
left a comment
There was a problem hiding this comment.
added a few more comments. will continue tomorrow
lang/cs/Org.Apache.REEF.Network/Elastic/Comm/ITaskMessageResponse.cs
Outdated
Show resolved
Hide resolved
| /// <param name="message">The task message for the operator</param> | ||
| /// <param name="returnMessages">A list of messages containing the instructions for the task</param> | ||
| /// <exception cref="IllegalStateException">If the message cannot be handled correctly or generate an incorrent state</exception> | ||
| void OnTaskMessage(ITaskMessage message, ref List<IElasticDriverMessage> returnMessages); |
There was a problem hiding this comment.
Note to self: why ref? isn't it against the idea of asynchronous event handlers? Also, maybe we should separate input and output messages? i.e. do one of:
// mesages in, messages out, in the most generic container
IEnumerable<IElasticDriverMessage> OnTaskMessage(
ITaskMessage message, IEnumerable<IElasticDriverMessage> driverMessages);
// messages in, async handler for messages out
void OnTaskMessage(ITaskMessage message,
IEnumerable<IElasticDriverMessage> driverMessages,
EventHandler<IEnumerable<IElasticDriverMessage>> handler = null); // or a delegateneed to track the usage of this interface and give it more thought later...
lang/cs/Org.Apache.REEF.Network/Elastic/Comm/Impl/UpdateMessagePayload.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Comm/Impl/TopologyMessagePayload.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Comm/Impl/TopologyMessagePayload.cs
Outdated
Show resolved
Hide resolved
| /// <param name="buffer">The memory space where to copy the serialized update</param> | ||
| /// <param name="offset">Where to start writing in the buffer</param> | ||
| /// <param name="updates">The updates to serialize</param> | ||
| internal static void Serialize(byte[] buffer, ref int offset, List<TopologyUpdate> updates) |
There was a problem hiding this comment.
By the way, why are we serializing everything like that? is it for Java interoperability?
There was a problem hiding this comment.
Not really. Where you suggesting to use codecs? I found that the previous group communication uses a similar strategy and I simply piggyback on that. I actually like to have the serialization logic within the message class.
lang/cs/Org.Apache.REEF.Network/Elastic/Comm/Impl/ElasticDriverMessageImpl.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Driver/IElasticTaskSetManager.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Driver/Default/DefaultElasticTaskSetManager.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Driver/Default/DefaultElasticTaskSetManager.cs
Outdated
Show resolved
Hide resolved
|
|
||
| _hasProgress = true; | ||
| var id = Utils.GetContextNum(activeContext) - 1; | ||
| var taskId = Utils.BuildTaskId(StagesId, id + 1); |
There was a problem hiding this comment.
man, this +/- 1 thing is really confusing. let's come back to it later and discuss some more elegant solution
There was a problem hiding this comment.
I know! the fact is that task infos are internally stored 0-indexed while externally they are used as 1-indexed. The solution here could be to just 0-index everywhere.
lang/cs/Org.Apache.REEF.Network/Elastic/Driver/Default/DefaultElasticTaskSetManager.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Operators/Logical/ElasticOperator.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Operators/Logical/ElasticOperator.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Operators/Logical/ElasticOperator.cs
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Operators/Logical/ElasticOperator.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Operators/Logical/ElasticOperator.cs
Outdated
Show resolved
Hide resolved
| NewEvaluatorNumCores = numCores; | ||
| NewEvaluatorMemorySize = memorySize; | ||
|
|
||
| System.Threading.Tasks.Task.Factory.StartNew(() => Clock.Run(), TaskCreationOptions.LongRunning); |
There was a problem hiding this comment.
just
System.Threading.Tasks.Task.Factory.StartNew(Clock.Run, TaskCreationOptions.LongRunning);There was a problem hiding this comment.
Also, I feel a bit uneasy about starting a new thread from the injectable constructor. it's hard to track when exactly it will run and when we'll stop it. I would rather have a separate .Start() method (maybe not in this class), and the corresponding Stop()/Dispose()
There was a problem hiding this comment.
I didn't 100% get it. You are suggesting something like starting the clock AFTER (outside) the constructor? So to flow will be (1) inject the constructor, (2) invoke Start?
lang/cs/Org.Apache.REEF.Network/Elastic/Failures/FailuresClock.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Failures/FailuresClock.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Failures/FailuresClock.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Driver/Default/DefaultElasticTaskSetManager.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Driver/Default/DefaultElasticTaskSetManager.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Driver/Default/DefaultElasticTaskSetManager.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Driver/Default/DefaultElasticTaskSetManager.cs
Outdated
Show resolved
Hide resolved
| { | ||
| if (!_completed) | ||
| { | ||
| _completed = _stages.Select(stage => stage.Value.IsCompleted).Aggregate((com1, com2) => com1 && com2); |
There was a problem hiding this comment.
maybe
_completed |= _stages.Values.All(stage => stage.IsCompleted);and there is no need for if
There was a problem hiding this comment.
Why there is no need for if? I want to print only once when we move from not complete to complete.
lang/cs/Org.Apache.REEF.Network/Elastic/Driver/Default/DefaultElasticStage.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Driver/Default/DefaultElasticStage.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Driver/Default/DefaultElasticStage.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Driver/Default/DefaultElasticStage.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Driver/Default/DefaultElasticStage.cs
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Driver/Default/DefaultElasticContext.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Driver/Default/DefaultElasticContext.cs
Show resolved
Hide resolved
| lock (_statusLock) | ||
| { | ||
| _failureStatus = _failureStatus.Merge( | ||
| new DefaultFailureState((int)DefaultFailureStates.ContinueAndReconfigure)); |
There was a problem hiding this comment.
I really don't like that we have three different representations for the failure state - int, enum, and class/interface. That leads to awkward casts and news. I am still thinking about the most elegant way to resolve this; one approach would be to remove the DefaultFailureStates enum, and go old Java style, like this:
public sealed class DefaultFailureState : IFailureState
{
public static readonly DefaultFailureState Continue = new DefaultFailureState(0);
public static readonly DefaultFailureState ContinueAndReconfigure = new DefaultFailureState(1);
// ...then here we can just write
_failureStatus = _failureStatus.Merge(DefaultFailureState.ContinueAndReconfigure);but I am not 100% sure... maybe we can keep the enum, ditch the IFailureState interface, and use implicit operators to convert between the class and enum..
There was a problem hiding this comment.
I agree that is weird. I personally like the IFailureState interface because it allows to implement different failure states and related ways of merging them (this is the behavior I want for failure states). If I have to chose, I would prefer to maintain the interface and implement the default behavior in a different way (e.g., as you were suggesting, without the enum).
| lock (_statusLock) | ||
| { | ||
| _failureStatus = _failureStatus.Merge( | ||
| new DefaultFailureState((int)DefaultFailureStates.ContinueAndReschedule)); |
There was a problem hiding this comment.
also, if we are OK with mutable the FailureState property (and it is currently fully read and write accessible from the outside - maybe we should tighten it up a bit?), we can write e.g.
_failureStatus.Update(DefaultFailureState.ContinueAndReschedule);There was a problem hiding this comment.
Are you proposing to implement a default behavior for Merge when mutating? I like it!
lang/cs/Org.Apache.REEF.Network/Elastic/Driver/Default/DefaultElasticStage.cs
Outdated
Show resolved
Hide resolved
|
Let me take track of the major changes I will have to do (once you are done with the "syntactic" pass). I will update this list as we agree on the changes.
|
motus
left a comment
There was a problem hiding this comment.
added a few minor things. Will write more later today
| // and duration will update to reflect the new alarm's timestamp | ||
| for (long duration = _timer.GetDuration(_schedule.First().TimeStamp); | ||
| duration > 0; | ||
| duration = _timer.GetDuration(_schedule.First().TimeStamp)) |
There was a problem hiding this comment.
Note that we don't lock the _schedule here - is that Ok?
There was a problem hiding this comment.
I copied this from wake. I think it is probably ok: additions are synchronized so in the worst case you will remove and event that was just added and not the one you were waiting for.
lang/cs/Org.Apache.REEF.Network/Elastic/Failures/FailuresClock.cs
Outdated
Show resolved
Hide resolved
|
|
||
| DisposeActiveContext(); | ||
|
|
||
| _isDisposed = true; |
There was a problem hiding this comment.
Do we really need _isDisposed flag, when we already have separate ones for task and the context? I would simply write the .Dispose() method as
public void Dispose()
{
DisposeTask();
DisposeActiveContext();
}-- after all, we don't call .Dispose() that often (and we don't set _isDisposed in .DropRuntime() anyway 😄)
There was a problem hiding this comment.
We should discuss on this. Disposing and dropping runtimes are different. You drop runtimes when the tasks fail, while you dispose them when the task manager finishes / fails. I think that more than removing _isDisposed I should add it in other places so that we cannot add a runtime to a disposed task for instance.
lang/cs/Org.Apache.REEF.Network/Elastic/Driver/Default/DefaultElasticTaskSetManager.cs
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Driver/Default/DefaultElasticTaskSetManager.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Driver/Default/DefaultElasticTaskSetManager.cs
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Driver/Default/DefaultElasticTaskSetManager.cs
Outdated
Show resolved
Hide resolved
motus
left a comment
There was a problem hiding this comment.
Done with the Failures folder. Left a few comments, all minor. Still, the thing that bothers me most is type incompatibility between DefaultFailureStates and IFailureState. We should think of something more elegant.
lang/cs/Org.Apache.REEF.Network/Elastic/Driver/Default/DefaultElasticTaskSetManager.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Driver/Default/DefaultElasticTaskSetManager.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Driver/Default/DefaultElasticTaskSetManager.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Driver/Default/DefaultElasticContext.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Failures/Default/DefaultFailureStateMachine.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Failures/Default/DefaultFailureStateMachine.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Failures/Default/DefaultFailureStateMachine.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Failures/Default/DefaultFailureStateMachine.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Failures/Default/DefaultFailureStateMachine.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Failures/Default/DefaultFailureStateMachine.cs
Outdated
Show resolved
Hide resolved
| { | ||
| NumOfDataPoints = initalPoints; | ||
| NumOfFailedDataPoints = initalPoints; | ||
| State = new DefaultFailureState((int)initalState); |
There was a problem hiding this comment.
Yeah we have to discuss that. I still cannot think of something elegant.
lang/cs/Org.Apache.REEF.Network/Elastic/Operators/OperatorType.cs
Outdated
Show resolved
Hide resolved
| /// </summary> | ||
| /// <typeparam name="T">The data type of the message</typeparam> | ||
| [Unstable("0.16", "API may change")] | ||
| public interface ISender<T> |
There was a problem hiding this comment.
Should we make T contravariant? i.e.
public interface ISender<in T>There was a problem hiding this comment.
I made it contravariant, but I am not sure about the use case. In the sense that users will specify anyway the type on the driver side. Did you have any specific use case in mind?
lang/cs/Org.Apache.REEF.Network/Elastic/Operators/Logical/ElasticOperator.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Operators/Logical/ElasticOperator.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Operators/Logical/ElasticOperator.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Operators/Logical/ElasticOperator.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Operators/Logical/ElasticOperator.cs
Outdated
Show resolved
Hide resolved
motus
left a comment
There was a problem hiding this comment.
Done with the logical operators. Switching to the physical ones now..
lang/cs/Org.Apache.REEF.Network/Elastic/Operators/Logical/ElasticOperator.cs
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Operators/Logical/ElasticOperator.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Operators/Logical/ElasticOperator.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Operators/Logical/ElasticOperator.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Operators/Logical/Default/DefaultBroadcast.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Operators/Logical/Default/DefaultOneToN.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Operators/Logical/Default/DefaultOneToN.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Operators/Logical/Default/DefaultOneToN.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Operators/Logical/Default/DefaultEmpty.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Operators/Logical/Default/DefaultEmpty.cs
Outdated
Show resolved
Hide resolved
motus
left a comment
There was a problem hiding this comment.
halfway through the Task package; will add more comments tomorrow.
lang/cs/Org.Apache.REEF.Network/Elastic/Operators/Physical/IElasticIterator.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Task/TaskToDriverMessageDispatcher.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Task/TaskToDriverMessageDispatcher.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Task/Default/IDefaultTaskToDrivermessages.cs
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Task/ElasticDriverMessageHandler.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Task/Default/DefaultTaskToDriverMessageDispatcher.cs
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Task/Default/DefaultTaskToDriverMessageDispatcher.cs
Outdated
Show resolved
Hide resolved
| Buffer.BlockCopy( | ||
| BitConverter.GetBytes((ushort)TaskMessageType.JoinTopology), 0, message, offset, sizeof(ushort)); | ||
| offset += sizeof(ushort); | ||
| Buffer.BlockCopy(BitConverter.GetBytes((ushort)operatorId), 0, message, offset, sizeof(ushort)); |
There was a problem hiding this comment.
we should probably think about using protobuf instead of our own serialization. Let's discuss it later.
lang/cs/Org.Apache.REEF.Network/Elastic/Task/NodeObserverIdentifier.cs
Outdated
Show resolved
Hide resolved
|
|
||
| IIdentifier destId = _idFactory.Create(destination); | ||
|
|
||
| for (int retry = 0; !Send(destId, message); retry++) |
There was a problem hiding this comment.
extra space before !Send(); also, this code looks like another good candidate to be replaced with retry policy mechanism
There was a problem hiding this comment.
By the way, I read up on the retry policies available for .NET, and it looks like the Microsoft.Practices.TransientFaultHandling I've mentioned earlier (and used elsewhere in REEF) is now deprecated. The best replacement seems to be the Polly project. So we can use MS Practices now as we already have the dependency, and later switch to Polly (in a separate PR, of course)
lang/cs/Org.Apache.REEF.Network/Elastic/Task/CommunicationLayer.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Task/CommunicationLayer.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Task/Default/DefaultCommunicationLayer.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Task/Default/DefaultCommunicationLayer.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Task/Default/DefaultElasticContext.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Task/Default/DefaultElasticContext.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Task/Default/DefaultElasticContext.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Task/Default/DefaultElasticContext.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Task/CommunicationLayer.cs
Outdated
Show resolved
Hide resolved
...e.REEF.Network/Elastic/Topology/Physical/Default/OperatorTopologyWithDefaultCommunication.cs
Outdated
Show resolved
Hide resolved
...e.REEF.Network/Elastic/Topology/Physical/Default/OperatorTopologyWithDefaultCommunication.cs
Outdated
Show resolved
Hide resolved
...e.REEF.Network/Elastic/Topology/Physical/Default/OperatorTopologyWithDefaultCommunication.cs
Outdated
Show resolved
Hide resolved
...e.REEF.Network/Elastic/Topology/Physical/Default/OperatorTopologyWithDefaultCommunication.cs
Outdated
Show resolved
Hide resolved
...e.REEF.Network/Elastic/Topology/Physical/Default/OperatorTopologyWithDefaultCommunication.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Topology/Logical/Impl/FlatTopology.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Topology/Logical/Impl/FlatTopology.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Topology/Logical/Impl/FlatTopology.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Topology/Logical/Impl/FlatTopology.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Topology/Logical/Impl/FlatTopology.cs
Outdated
Show resolved
Hide resolved
motus
left a comment
There was a problem hiding this comment.
a few minor suggestions. will add more soon
lang/cs/Org.Apache.REEF.Network/Elastic/Topology/Logical/Impl/FlatTopology.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Topology/Logical/Impl/FlatTopology.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Topology/Logical/Impl/FlatTopology.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Topology/Logical/Impl/FlatTopology.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Topology/Logical/Impl/FlatTopology.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Topology/Logical/Impl/FlatTopology.cs
Outdated
Show resolved
Hide resolved
motus
left a comment
There was a problem hiding this comment.
left a few comments on ITopology and its implementations
lang/cs/Org.Apache.REEF.Network/Elastic/Topology/Logical/Impl/FlatTopology.cs
Show resolved
Hide resolved
| } | ||
|
|
||
| // This is required later in order to build the topology | ||
| if (_taskStage == string.Empty) |
There was a problem hiding this comment.
_taskStage is now null by default. maybe,
_taskStage = _taskStage ?? Utils.GetTaskStages(taskId);There was a problem hiding this comment.
But this will do a check and reassignment every time instead of just a check, no?
lang/cs/Org.Apache.REEF.Network/Elastic/Topology/Logical/Impl/FlatTopology.cs
Outdated
Show resolved
Hide resolved
| output += rep + " "; | ||
| } | ||
|
|
||
| return output; |
There was a problem hiding this comment.
shorter version:
return _rootId + "\n" + string.Join(" ", _root.Children.Select(
node => node.FailState == DataNodeState.Reachable ? "" + node.TaskId : "X"));(by the way, shouldn't we print IDs of unreachable tasks? say, $"UNREACHABLE:{node.TaskId}"?)
There was a problem hiding this comment.
I dunno. The failing node should be already present in the log because the task manager logs it once it receives a failure. This is more sort of like "hey we got a failure, this is the new state of world".
lang/cs/Org.Apache.REEF.Network/Elastic/Topology/Logical/Impl/FlatTopology.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Topology/Logical/Impl/DataNode.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Topology/Logical/Impl/DataNode.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Topology/Logical/ITopology.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Topology/Logical/ITopology.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Topology/Logical/Impl/FlatTopology.cs
Outdated
Show resolved
Hide resolved
motus
left a comment
There was a problem hiding this comment.
Few more comments, and that concludes my second full pass over the entire code
lang/cs/Org.Apache.REEF.Network/Elastic/Topology/Physical/OperatorTopology.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Topology/Physical/OperatorTopology.cs
Outdated
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Topology/Physical/DriverAwareOperatorTopology.cs
Outdated
Show resolved
Hide resolved
...e.REEF.Network/Elastic/Topology/Physical/Default/OperatorTopologyWithDefaultCommunication.cs
Outdated
Show resolved
Hide resolved
| protected readonly ConcurrentQueue<ElasticGroupCommunicationMessage> _sendQueue = | ||
| new ConcurrentQueue<ElasticGroupCommunicationMessage>(); | ||
| protected readonly BlockingCollection<ElasticGroupCommunicationMessage> _messageQueue = | ||
| new BlockingCollection<ElasticGroupCommunicationMessage>(); |
There was a problem hiding this comment.
trim trailing spaces, add blank line after each multiline declaration
There was a problem hiding this comment.
Wait, no indentation on multiline declarations?
lang/cs/Org.Apache.REEF.Network/Elastic/Topology/Physical/Default/OneToNTopology.cs
Show resolved
Hide resolved
lang/cs/Org.Apache.REEF.Network/Elastic/Topology/Physical/Default/OneToNTopology.cs
Outdated
Show resolved
Hide resolved
|
|
||
| #region Empty Handlers | ||
|
|
||
| public void OnError(Exception error) |
There was a problem hiding this comment.
is there a chance we'll need to override these handlers in derived classes? if yes, we should probably we make them virtual
There was a problem hiding this comment.
I don't think OnError etc. will be ever used because I believe errors will surface directly as crashes.
lang/cs/Org.Apache.REEF.Network/Elastic/Topology/Physical/Default/OneToNTopology.cs
Outdated
Show resolved
Hide resolved
| { | ||
| var childTaskId = Utils.BuildTaskId(StageName, child); | ||
|
|
||
| _children.TryAdd(child, childTaskId); |
There was a problem hiding this comment.
in fact, we can pass children into the OperatorTopologyWithDefaultCommunication constructor and initialize the dictionary once:
protected OperatorTopologyWithDefaultCommunication(
// ...
int disposeTimeout,
IEnumerable<KeyValuePair<int, string>> children = null)
{
_children = children == null
? new ConcurrentDictionary<int, string>()
: new ConcurrentDictionary<int, string>(children);and here do something like
: base(
// ...
disposeTimeout,
children.Select(child =>
new KeyValuePair<int, string>(child, Utils.BuildTaskId(stageName, child))))There was a problem hiding this comment.
I am not sure about this. I want to maintain the population of _children in the subclass because it is operator specific.
|
@motus I am checking the protobuf thing and is not that easy to do. I mean, I can probably change the serialization/deserialization of messages to use protobuff, but to completely use protobuff end to end I will have to go and change the network service (which is not my code) and this may require some indefinite amount of time. Do you think it is ok if I only try protobuff for my messages? |
|
Hi Matteo,
Glad that you are back to REEF! Thanks for looking at it. I meant only the
messages that you create and serialize on your own, not all the Wake stuff.
I am on vacation toll Wednesday - let's discuss it next week when I am back!
Thank you,
Sergiy.
…On Tue, Jul 16, 2019 at 5:22 PM Matteo Interlandi ***@***.***> wrote:
@motus <https://github.com/motus> I am checking the protobuf thing and is
not that easy to do. I mean, I can probably change the
serialization/deserialization of messages to use protobuff, but to
completely use protobuff end to end I will have to go and change the
network service (which is not my code) and this may require some indefinite
amount of time. Do you think it is ok if I only try protobuff for my
messages?
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#1487?email_source=notifications&email_token=AAAHVFCQMSEEVFNR24PNZBTP7ZQ43A5CNFSM4GNZUE7KYY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOD2CUH2Q#issuecomment-512050154>,
or mute the thread
<https://github.com/notifications/unsubscribe-auth/AAAHVFHM7ER6NYM4KTXP2A3P7ZQ43ANCNFSM4GNZUE7A>
.
|
This PR introduces the new elastic group communication framework. For the moment only the broadcast operator with flat topology is implemented. Further operators and topologies will be added in successive PRs.
This PR uses some of the pieces of PR #1479 so the latter should be merged before merging the former.
JIRA:
REEF-2054 Elastic Broadcast