Bebida on Batsim Notebook
WARNING: This file was superseed by my thesis chapter 5:
https://gitlab.inria.fr/mmercier/my-thesis/blob/master/Thesis/content/simulation.tex https://gitlab.inria.fr/mmercier/my-thesis/blob/master/Thesis/thesis.pdf
Bebida on batsim
Objectives
- We would like to validate Bebida on more parameters: size of reserved part for Big Data only / shared / HPC only, use of PFS or DFS, impact of Big Data application type and parameters (stage size, RDD replication factor,…)
- Observe interference between HPC and Big Data applications and find parameters/optimisations that minimize these interferences
- Compare it with other solutions like multi-level scheduling (YARN, Mesos, Kubernetes,…) or schedulers adaptor (Intel hadoop HPC scheduler adaptor)
- We would to be able to use the Big Data Job model for several use
case:
- Provide decision help for Cluster admin like infra dimensioning, select the right configuration tweak, etc…
- Study DFS Daemon interference on HPC jobs
- Study data mouvement and staging/prefetch for Big Data
- Study tradeoff between use a large PFS or use a DFS
Related works
FIRMAMENT
YarnSim
PDF: http://www.cs.iit.edu/~scs/psfiles/Ning_CCGrid15.pdf
Based on Codes/Ross. The network modele is based on CODES-net, with a simple mesh network model (a fat tree model was added on a other paper: http://publish.illinois.edu/science-of-security-lablet/files/2014/05/A-Modeling-and-Simulation-of-Extreme-Scale-Fat-Tree-Networks-for-HPC-Systems-and-Data-Centers.pdf)
The use a local disk model inside codes (CODES-lsm) to simulates disk IO properly. This modele takes random and sequential performance of the disk in parameter. Also used by FusionFS for validation.
Internal mecahanism of YARN is simulate with logical process that talks to each other (similar to batsim but with more network involved) Only FIFO scheduling is provided.
Users must tell YARNsim if the submitted job is computation intensive or data intensive through a user-defined configuration file
No code, not open source.
No explanation about how HDFS blocks are allocated and what is the model underneath.
Seems to model only MR with Input files has parameter
Starfish
PDF: http://www.cs.duke.edu/starfish/files/cidr11_starfish.pdf
Starfish is cost based model for exploring parameters in MapReduce Jobs and auto tuning of Hadoop parameters.
Learn performance model from Hadoop job using a profiler. Then use a What-if engine that mixes simulation and model-based optimisation to find optimal parameters.
Code disapear from github but still available on the university web page: http://www.cs.duke.edu/starfish/release.html
Only works for hadoop 2012 (Hadoop v0.20)
Discussion on Big Data Job Model
To fill these objective we need a model that is capable of: 1. take into account IO interferences effect common Big Data runtime: at least Spark and Map Reduce 2. run maleable jobs and reflect job resource taken/given cost 3. can be used with a PFS and/or a DFS
For IO interference (1) we need to reflect IO/compute/sync phases of typical Big Data application like MapReduce (MR): as Simgrid is fluid it means that these phases should be in separate jobs (for MSG profile). Spark is more complex than MR, but we can use the same abstraction at the stage level, because each stage is separated by a shuffle phase that provide some kind of checkpoint for the application: so a stage is atomic. It means that interruption leads to recompute only the current stage and not previous ones. We can compute job profiles from MR traces and Spark traces easily because the total amount of IO, CPU, and Memory used by each (Spark or MR) task is logged. With this model in phases, preemption cost (2) can be done just by killing current phase and resubmit it from the beginning. It becomes more complicated when it is only part of the resource of the jobs that are taken or given: The remaining work have to be spread over the new set of resource and it can reduce or increase the total phase execution time. This has to be managed depending on the model grain level (see below).
Here is the different possibilities from fine to coarse grain for the job model.
File system modelisation
For the file systems (3), the PFS is already modeled in Batsim and it can be added to the model by using the PFS job profiles for IO phase. For the DFS we need more complexe pattern that involve data locality. We can use two level of model: - the coarse grain that only takes a random data distribution for each IO phase e.g. 60% node local, 25% remote inside the job, 15% remote outside the job. These profiles can be generated from empirical data of bebida on locality distribution. - the fine grain that distributes a set of data blocks to each nodes and tracks the replica of these blocks and IO phases would be associated to a list of blocks to read or write.
Both can be implemented in Batsim with a profile that take a data mouvement matrix with all job nodes in one dimension and all DFS nodes in the other with quantity of data transferred in bytes. It’s a kind of generalisation of the PFS on all compute nodes. To simulate the bandwidth of the disk we can add to the platform file an asymmetric local link for each nodes in the DFS (maybe dynamically on startup?) and register it as a resource that is not part of the compute nodes but used for the DFS. This last thing need some new features in batsim. The rest of the model (coarse and fine) should be in the decision process.
Job (a.k.a. Application) level
Use a sequential profile with BSP-like job model can be a better idea because it is more coarse grain and we want that and preemption can be implemented by a killall and resubmit. This model fits very well the MapReduce model and the sequential Batsim jobs is capable of providing more complex model (Iterative MR) to better fit more complex runtime like Spark or Flink.
To enable this we need Batsim to provide the quantity of work already done (before killing) to be able to resubmit the job on an other set of resources and reflect the cost of preemption correctly. I added this feature in batsim in v1.2.0.
We also need a new job profile that takes an amount of work to spread evenly on the allocated resources at runtime. We need an MSG total amount profile to do that. Added in batsim (see this issue).
Here is example of how a Map Reduce application looks like:
{
"app1":: {
"type": "composed",
"seq": ["IO_stage","Map_stage", "Shuffle_stage", "Reduce_stage", IO_stage"]
},
"IO_stage": {
"type": "msg_par_hg_pfs0",
"size": 10e6
},
"Map_stage": {
"type": "msg_par_spread",
"cpu": 1e9,
"com": 2e5
},
"Reduce_stage": {
"type": "msg_par_spread",
"cpu": 1e6,
"com": 1e5
},
"Shuffle_stage"
"type": "msg_par_spread",
"cpu": 2e4,
"com": 5e9
}
}
To generate these profile we can extract aggregated metrics from Spark or Yarn traces. We need to create a tool to do this but it can be based on bebida spark logs parsing script.
The jobs that are submitted with there resource requirement in <CPU,MEM> tuple without walltime. We need to modify batsim to take this kind of job submission without complaning.
Worker (a.k.a. Executor, Container…) level
Using one job per node (executor level model) is closer to the Resource manager level (like YARN) that manage resources at this level.
But it has overhead because increasing or decreasing an application resources allocation leads to update every jobs that is contains in this allocation to load balance the remaining amount of work on the new set of resources.
Also, this level create synchronisation problem because if one executor is delayed, it should impact the others and this is impossible to reflect in batsim right now. But, with the implementation of parallel job profile this would make sense: Sequential and parallel job composition allows the stages synchronization to be modeled with parallel job for inside stage with no synchronization and sequential job at stage level to model synchronisation between stages.
Task level
Using tasks level model is very fine grain and leads to a complex decision process but with the possibility to get very realistic simulation. It can fit any task based data flow runtime like Spark and MapReduce but also HPC runtime like StarPU. The main question here is about job profiles and how this kind of job can be described.
We can define stages in an homogeneous or heterogeneous way:
Homogeneous Stages can be defined by this parameters: - Stage ID - Stage parents IDs - Total computation amount to be split on each task - Total communication amount to be split on each task - Total amount of IO to be split into each task - The task number to split the computation on
Here a template of a json profile for defining stage:
{
"Stage ID": 4,
"Parents IDs": [2, 3],
"Total Flops": 8e10,
"Total Communication": 10e6,
"Total IO": 1e8,
"Number of Tasks": 141,
}
Or with a simpler delay model can be only the time a task is taking instead of amount of resource usage. Like:
{
"Stage ID": 4,
"Parents IDs": [2, 3],
"Task Average Execution Time": 35,
"Number of Tasks": 141,
}
Heterogeneous model can be extracted from real execution traces of Spark or MR. Thus, a stage can be defined by a map of independent tasks profile with the number of occurrence of this profile, e.i. :
"stage_4": {
"Parents IDs": ["stage_2", "stage_3"],
"Tasks": ["task_23": 100, "task_24": 41],
},
"task_23": {
"type": "msg_par_hg",
"cpu": 1e9,
"com": 0
},
"task_23": {
"type": "msg_par",
"cpu": [5e6,5e6,5e6,5e6],
"com": [0,10e6,5e6,5e6,
5e6,0,5e6,5e6,
5e6,5e6,5e6,5e6,
5e6,5e6,5e6,5e6]
}
A complete job can be describe as a set of stage profiles submitted together. The scheduler will run them regarding there dependencies.
It can be directly implemented over Batsim now with an MSG task profile per task dynamically submitted by the decision process. However, this model complexity/realism will be long to implement and will increase the simulation time.
Job model validation
We chose to use the Job level model because it seems to be the simpler to implement and the fact that it is a good trade-off between model granularity and simulation cost.
Spark traces extractor, analysis and model generator are provided in the following experiment folder: ./experiments/workload_generation
We have to validate this model by comparing real Spark execution with simulation. This validation is twofold:
Execution time conservation for different application traces
First we compare execution time in real condition and in simulation for different application traces with different parameters (application type and input size). To achieve an acceptable error on this metric we have to go thought a platform calibration phase, to have a precise Simgrid platform definition; and a model calibration phase, to correct the eventual scale problem. The goal of the model calibration is to determine an eventual scaling factor to apply during the model generation in order to have confidence that we retrieve real execution time in simulation, whatever the Spark application execution traces that are given to the generator.
Execution time conservation with different platform parameters
To be able to do the model calibration we also have to confirm that the bottlenecks of the application is also conserved, and that changes on the platform (IO and Network bandwidth/latencies, number of nodes, number of core per nodes, etc.) lead to the same change of bottlenecks in real condition and in simulation.
Discussion on HPC job model
#TODO define batsim job models
To be able to see the interference on the HPC jobs we need those jobs to have a precise job model too. Do we use BSP too or SMPI?
SMPI traces can be retrieve quite easily from the Simgrid community but the cost on the simulation complexity is very high. Yet, we’re not sure that mixing multiple job model is relevant. The major problem is the fact that simgrid has to be calibrated for too different models, which can be unfeasible in practice.
We can generate more aggregated traces to get interferences effect in MSG model without the SMPI complexity.
The first issue is to get resources usage information from real allocation. We can get this input from two sources: Application profiling from real application execution or Time independent trace from SimGrid execution (smpirun).
However the SMPI approach needs to be validated: indeed SMPI traces are already validated for one application at a time one a calibrated platform. However the usage of SMPI with multiple application at the same time has not been compared to reality yet.
On the other hand, we have access to traces from froggy cluster that contains monitoring informations. This traces can be used to model parallel applications, but the monitoring system present on the cluster, like the traditional application resources monitoring tools, lacks information about sources and destinations for the network traffic. This information is needed to generate a precise communication matrix that is required by the parallel task model. A network monitoring tool, coupled to traditional one could solve this issue but it is not present in the current traces. IB? #TODO can we generate network matrix without this information?
Moreover as the cluster is in production, applications are already subject to network interference that are not controlled. This directly biases the extraction model process, which can hinder the model precision. One could extract job from OAR that specifically asked for isolated switches.
A solution for both problems is to run representative application, like exascale project proxy app, in a controlled environment with network traffic monitoring.
Now that we have the model input, we need to use this information to generate our application models. To do so, the first idea, is to use a more simple model by cumulate each resource consumption in on parallel task job. This approach has the advantage to be simple, but it flatten resource usage over which decrease the possibility of interference. #TODO add simple graph to explain the phenomena. The second idea is to mimic the big data job model by splitting the job into stages in order to keep the resources usage peaks (I/O, cpu, communications). This this leads of splitting the job in meaningful stages. Each stages needs to capture different resources usage patterns, for each kind of application. As each application is different, the aggregation (in stages) is problematic. The act of splitting application in stages, that we call sampling, can be done with different approaches:
- Fixed-Time slicing, one decides an aggregation time step to slice the application, a sample is created from each slice.
- Sample aggregation. Using previous method, one can decide to merge stages that are similar.
- Cut in a fixed number of pieces, on decides a finite number of slice independently of the application execution time.
- Signal Processing: use fancy technique that we don’t know yet about TODO find-out.
Each stage is created by computing the standard mean of resources consumption of the time slice.
#TODO how can we validate this work?
Implementation
Broker description
The broker is the part of the simulation that will enable multiple schedulers to talk to batsim: It will route the messages from batsim to the different schedulers and implement the resource sharing logic (here prolog/epilog). It can be used in the futur to emulate meta schedulers like Mesos. In details, it will receive batsim messages and regarding the workload route the message to the appropriate scheduler. Then, it will receive the response from the scheduler and send it back to batsim. It will be able to intercept certain message to apply the resource sharing logic. For the bebida specific case, when a job is started by the HPC scheduler, before sending the execution order to batsim it will ask to the Big Data scheduler to decommission the associated nodes (Note that we will need a new message for this that is not in batsim yet). When the Big Data scheduler receive this order it will ask batsim to kill the jobs that were running on those resources and probably resubmit a new job to recompute what was lost. Only then, the HPC scheduler execution message will be send to batsim. This is the HPC job prolog. The epilog is more simple: when an HPC job finished the broker will recommission the associated nodes to the big data part before telling the HPC scheduler that the job is over.
The Brocker is available here: https://gitlab.inria.fr/batsim/batbroker
Describe Big Data scheduler
Needed basic features:
Job allocation with constraints on resource number
- Need an interval set library
- Queue policy: FCFS
Capable to ADD/REMOVE resources dynamically
- Kill and Re-execute application
- Add preemption cost (re-execute not finished stage)
DFS + Locality:
- Implement a Pseudo DFS:
- map of the resource/block allocation
- map of the job/block association
- Add data locality constraints to the allocation
- dynamically re-submit jobs with specific IO depending on block placement
The scheduler is available here: https://gitlab.inria.fr/batsim/pybatsim
IO model in Batsim
A discussion on this topic is available here: https://gitlab.inria.fr/batsim/batsim/issues/17