How to program an Atomic model in PowerDEVS

In this guide we are going to learn how to program a new atomic model in PowerDEVS. The new model will improve the representation of EventFilter for the simple model we built in the Getting Started guide.

It assumes you already have installed and configured PowerDEVS correcty, and that you read and have a good background of DEVS theory. Also, that you have done the Getting Started guide.

In the first step we will show how to implement the 5 DEVS methods (init, dint, dext, lambda and exit), reading parameters from the configuration file and loging debug string to a log file. In the second step, logging variabes to scilab is demostrated.

Before starting - What we will model?

We are going to continue with the modeling of the TDAQ Phase I we started in the Getting Started guide. That model was focused on the StorageHandler (SH), specifically on the amount of storage required to keep all necessary data.
In this guide, we will concentrate on the EventFilter (EF) to be able to answer some of these questions:

  1. What is the load of the EF server farm?
  2. How many servers (and cores) will be needed?
  3. Does the load balancing of events affect latency?
In that first guide we modeled the EF as a processor which filters events at a certain rate. But this model processes events one by one, only a single event is being processed at the EF at a time, but this is not realistic. More over, this way of modeling the EF is useful for answering questions about the SH, but not the questions about the EF. In the real system, the EF is implemented with several servers where each server can process multiple events at the same time.

NOTE: The best way to model this would be using VectorialDEVS, which allows to create several instances of the same model. This way you can have multiple ProcessorSharing models (each representing one server), each configure to process a maximum amount of events in parallel (this would be the amount of core in each server). But doing this would be too easy to implement as it can be done dragging and dropping models from the library, and the purpose of this guide is to learn how to implement new atomic models.
So, in this guide we will implement an intermediate solution: a single model which will represent all servers.

In this guide we will implement a new atomic model to represent non-sharing servers: a server that can process certain amount of jobs at the same time, assigning each job the same processing time (independently of how many other jobs are being processed).

Step 1 - Basic DEVS functions, parameters and debug logging

Model

Model setup

We will start with the model we built in the Getting Started guide. If you don't have it, you can find it in the getting_started branch in GIT repository (git checkout -b getting_started), in the folder /examples/HowTos/GettingStated. We recommend not working in that model, but instead to make a copy of it (for example to /examples/<YourName>/NewAtomicModelGuide)

Add the new model

  1. Open the model in PowerDEVS (File --> Open --> <ThePathWereYouHaveTheModel>)
  2. Delete the EventFilter model. We will replace it with the new one.
  3. Add a new Atomic model (drag&drop from the Basic Elements library). Name it EventFilter.
  4. Configure the model ports and parameters
    1. Right-click --> Edit
    2. In the Properties tab, verify that the model has 1 input port (to receive new jobs) and 1 outport (so send finished jobs)
    3. In the Parameters tab, add a new parameter serviceCapacity (computing power assigned to each jobs). Set Type=String, Value=EventFilter.servicePower
    4. In the Parameters tab, add a new parameter maxCapacity (maximum simultaneous jobs that can be processed). Set Type=String, Value=EventFilter.maxJobs
  5. Connect the first output of the SH to the input of the EF.
  6. Connect the output of the EF to the second input of the SH.





  7. Add the c++ files to implement the new behavior. Add the In the Code tab, select the Atomic/GettingStarted folder and click New File
    This will open the PowerDEVS code editor, which has different tab for each of the DEVS functions (init, TA, int, ext, lambda and exit). On the right panel you can declare state variables.

  8. File->Save, and name the new file NonSharingProcessorServer
    This will create the .cpp and .h files with the basic structure to implement an atomic model: A new class (with the name of the file you choose) that implements the virtual Simulator class. All methods will be empty (init, ta, dint, dext, lamda and exit). In the next section we will add logic to these methods.

    NOTE TO DEVELOPERS: The PowerDEVS code editor is good for students learning DEVS, but you can use any editor you like to edit c++ files (it is just a normal c++ makefile project). For example, here you can find a guide on How to configure PowerDEVS in eclipse.
    Note that if you edit the code in an external editor, the PowerDEVS editor might stop working.

Programming the behavior

Before we start programming, lets discuss a bit more in details the behavior we want. When a new jobs arrives to be processed we want the model to keep it until it finishes processing it. The processing time for each job will depend on the jobWeight (finishTime = arrivalTime + (jobWeight/computingPower), as the same computing power is assigned to each job independently of how many more jobs are being processed at the moment (this is in general not true, and particularly not true for the real EF. But we will start modeling it this way to learn how to program atomic models). When the finishTime for the job is reached, we want to send the job through the output port. Also, because the queue works with a pull model, the server will need to request at the begging as many jobs as its capacity. Then, each time it finishes a jobs, a new request should be sent (but we will use the same finished job output to signal these requests).

  1. Open the NonSharingProcessorServer.h file you create before.
  2. These is a normal c++ class. So we will need to add the necessary #includes:

    #include "sinks/ParameterReader.h" // To read parameters from the config file
    #include "queueingTheory/Jobs/Job.h" // job class declaration
    #include <deque>


  3. Lets declare the variables that will make the State of the model:

    class NonSharingProcessorServer: public Simulator {

    typedef std::shared_ptr<Job> Job_ptr;

    // Parameters
    double serviceTime = -1;
    double maxCapacity = -1;

    // state variables
    double sigma;
    std::deque<std::pair<Job_ptr, double> > queuedJobs;

    The first 2 are to hold the parameters. Sigma will hold the time for the TA function (it is a normal practice to have this variable in all models as it simplifies the implementation of TA).
    Will use a dequeue to hold all the jobs which are currently being processed and their corresponding finish time.


  4. Switch to the NonSharingProcessorServer.cpp file
  5. In the Init function, we need to read the parameters from the configuration, and set up the initial values
    1. To read the parameters we use the readDefaultParameterValue<T>(paramName) function. As you can see the first argument is the parameter name, which is retrieved from the model specification (the one we set in the GUI) that is passes to the atomic model as variadic arguments.

      It is important here that the number of times you call va_arg should be the same as the amount of parameters you declare in the GUI

      void NonSharingProcessorServer::init(double t,...) {
      va_list parameters;
      va_start(parameters,t);

      char* fvar;
      fvar = va_arg(parameters,char*);
      this->serviceTime = readDefaultParameterValue<double>(fvar);
      printLog(1, "[%f] %s: serviceTime: %f \n",t, this->getName(), this->serviceTime);

      fvar = va_arg(parameters,char*);
      this->maxCapacity = readDefaultParameterValue<double>(fvar);
      printLog(1, "[%f] %s: maxCapacity: %f simultaneous jobs . \n",t, this->getName(), this->maxCapacity);

      Note the use of printLog(logLevel, message, ...). This function is used to produce textual logging to the pdevs.log file during execution and is sometimes useful during development as a debugging tool.
      For production simulations, you should set the priority high, so as not to produce a lot of I/O to disk.


    2. The initial state of the model, should be to output job requests to the queue.
      So we set sigma=0 (to produce an internalTransition immediately), and we will queue maxCapacity faked jobs with finishTime 0 to produce the requests.
      Add the following code inside the init function, after reading the parameters.

      // Initialize state
      this->sigma = 0; // start requesting jobs
      auto initJobRequest = std::make_shared<Job>(0); // a moke job to signal the first request to the queue.
      for(int i=0; i < this->maxCapacity; i++){
      printLog(1, "[%f] %s: Adding moke job. Size=%u \n",t, this->getName(), this->queuedJobs.size());
      this->queuedJobs.push_back({initJobRequest, t});
      }


  6. In the ta (timeAdvance) function, we will simply return the sigma variable:

    double NonSharingProcessorServer::ta(double t) {
    //This function returns a double.
    return this->sigma;
    }

  7. In the dext we should handle when a new event arrives to the model. In our case, when a new job arrives.

    void NonSharingProcessorServer::dext(Event x, double t) {

    auto newJob = castEventPointer<Job>(x, this);

    if(this->queuedJobs.size() < this->maxCapacity){ // there is space, add new job
    // add the new job into the queue
    this->addNewJob(t, newJob);

    // next sigma
    this->sigma = this->queuedJobs.front().second - t;
    printLog(1 , "[%f] %s: New Job #%i (weight=%f) added for processing. total jobs=%i, nextFinishTime=%f \n",t, this->getName(), newJob->getId(), newJob->getWeight(), this->queuedJobs.size(), this->queuedJobs.front().second);
    } else { // program discard
    printLog(1, "[%f] %s: Not enough capacity to process Job #%i. Job discarded \n",t, this->getName(), newJob->getId());
    this->sigma = this->sigma - this->e; // Continues as before
    }
    }

    The castEventPointer<Job>(event, model) method is use to get the object that the event contains, in our case the actual job to process.
    If there is enough capacity, we process the job. If not we just discard it. For our model, the model will never discard jobs, but just in case we handle that situation too.
    If we process the job we do 2 things: first we add the job to the queue (we will program the addNewJob method in the next step), and second we set the new sigma (The finishTime of the first job minus the current time).


  8. In the addNewJob method we should add the job to the queue in a sorted manner: jobs with smaller finishTime fist, so as to guarantee that the next jobs to finish is always at the begging.
    Add the following code at the end of the file:

    void NonSharingProcessorServer::addNewJob(double currentTime, std::shared_ptr<Job> newJob){
    double finishTime = currentTime + (newJob->getWeight() / this->serviceTime);

    // insert in corresponding index to keep the queue sorted
    for(auto jobInQueue = this->queuedJobs.begin(); jobInQueue < this->queuedJobs.end(); jobInQueue++){
    if(jobInQueue->second > finishTime ){ // if we found the index where to insert the new one
    this->queuedJobs.insert(jobInQueue, {newJob, finishTime});
    return;
    }
    }

    // If position was not found, insert it at the very end
    this->queuedJobs.push_back({newJob, finishTime});
    }


    NOTE: this sorted insertion can make the simulation really slow if the model will hold O(10K) events. There are more efficient ways to implement the sorting, but this is not the focus of this guide. Also, with a more realistic implementation using VectorialDEVS (a model for each server) sorting so many events is not necessary (each server would have O(10) cores maximum).


    Add the new addNewJob method heather to the .h file:
    private:
    void addNewJob(double currentTime, Job_ptr newJob);
  9. In the lambda function, we should sent the finished jobs. Now that we know that new jobs are inserted sorted in the queue, the lambda behavior is streightforward: just send the first job in the queue:

    Event NonSharingProcessorServer::lambda(double t) {
    auto finishedJob = this->queuedJobs.front().first;
    printLog(1, "[%f] %s: Sending finished job #%u. total jobs=%i \n",t, this->getName(), finishedJob->getId(), this->queuedJobs.size());
    return Event(this->queuedJobs.front().first, 0);
    }

  10. The dint function is executed right after the lamda function. Here we should change the state of the model: pop in the queue to remove the jobs we just sent, and update sigma:

    void NonSharingProcessorServer::dint(double t) {
    // remove sent job from the queue
    this->queuedJobs.pop_front();

    // set the TA for next internal transition with the finish time of the next job (if there is one)
    this->sigma = std::numeric_limits<double>::max();
    if(! this->queuedJobs.empty()){
    this->sigma = this->queuedJobs.front().second - t;
    }
    }

We have finished programming the behavior of our model by implementing the 5 functions required by DEVS: init, dint, dext, lambda and exit

Testing the implementation

To start with, we will use the debug log to valiate that we programmed as expected. One day a very very bright student will develop the infrastructure to create DEVS unit tests so that we can specify automated test to validate the behavior of new models.

  1. Set the parameters in the model.scilabParams file as follows:
    DataHandler.period_mu = 1/(400); // mean for the exponential distribution
    DataHandler.jobSize_value = 0.3; // value for the constant distribution

    EventFilter.servicePower = 1; // Processing Power
    EventFilter.maxJobs = 3; // Max amount of jobs

    This is setting maximum 3 jobs. We set jobs to be generated with jobWeight=0.3 and the EF to process with servicePower=1 (so each jobs should take 0.3 s to process).
  2. Run the simulation from the PowerDEVS GUI. You can run a few seconds (100s for example).
  3. Open the output/pdevs_run0.log file. This is where all the printLog(..) messages are written.
  4. You can read what is written in this file.
    At the begging of the file there are some details of what is executing, the parameters of each model the execution times of the init, simulation. At the end of the file, you should find execution times for the exit function and total execution time. Also some data of how many variables were loggued and the time it took to log them.
    In the middle you will find the things that we printed with the EventFilter model. Verify the following"
    1. At time [0.000000], 3 lines with the fake jobs and its corresponding requests.
    2. Then, at different times. when the eventFilter receives the new jobs. The times are random and corresponds to the generation times. You can verify here the total jobs increasing, also the finishTime set to arrivalTime+0.3s
    3. After you can verify that there are the messages from the lamda function for the output of finished should. Verify that they correspond to the arrivalTime+0.3s
    4. When the finished jobs is sent, at the same time (but in the log file line just below), you should see that the queue sent a new job to start processing.
    5. This is an extract of my log file:
[0.000000] EventFilter: serviceTime: 1.000000
[0.000000] EventFilter: maxCapacity: 3.000000 simultaneous jobs .
[0.000000] EventFilter: Adding moke job. Size=0
[0.000000] EventFilter: Adding moke job. Size=1
[0.000000] EventFilter: Adding moke job. Size=2
[0.000000] EventFilter: Sending finished job #1. total jobs=3
[0.000000] EventFilter: Sending finished job #1. total jobs=2
[0.000000] EventFilter: Sending finished job #1. total jobs=1
[0.020654] EventFilter: New Job #2 (weight=0.300000) added for processing. total jobs=1, nextFinishTime=0.320654
[0.021099] EventFilter: New Job #3 (weight=0.300000) added for processing. total jobs=2, nextFinishTime=0.320654
[0.021929] EventFilter: New Job #4 (weight=0.300000) added for processing. total jobs=3, nextFinishTime=0.320654
[0.320654] EventFilter: Sending finished job #2. total jobs=3
[0.320654] EventFilter: New Job #5 (weight=0.300000) added for processing. total jobs=3, nextFinishTime=0.321099
[0.321099] EventFilter: Sending finished job #3. total jobs=3
[0.321099] EventFilter: New Job #6 (weight=0.300000) added for processing. total jobs=3, nextFinishTime=0.321929
[0.321929] EventFilter: Sending finished job #4. total jobs=3

Step 2 - Adding logging to Scilab

Writing things to the log file can be useful for debugging purposes, but it can be really slow to execute as it is writing to disk. Also, it is very hard to process and analyze this kind of free-text logs.
In this step we will turn off the logging to disk and log to some variables to Scilab.
We will log the mean serviceTime, totalFinished jobs, and the amount of simultaneous jobs processing.

  1. Change all the calls to printLog, to set the logging level to LOG_LEVEL_FULL_LOGGING: printLog(LOG_LEVEL_FULL_LOGGING, ..... )
    Do this for each call to pringLog in the NonSharingProcessorServer.cpp file. This will turn off logging to disk.
  2. In the NonSharingProcessorServer.h file we will add some variables to hold the counter of finished jobs and a logger object which we will use to log our variables:

    // Helpers
    std::shared_ptr<IPowerDEVSLogger> logger;
    uint totalFinished = 0;

    NOTE: The logguer is a pointer to an abstract class. There are many implementations of loggers.


  3. In the init function, add the following lines at the end to initialize the logger.

    this->logger = createDefaultLogger(SCILAB_LOG_LEVEL_ALWAYS, this->getName());
    this->logger->initSignals(3,"finished", "serviceTime", "currentJobs");

    NOTE: Using the createDefaultLogger function, creates the logger according to the configuration, which is now set to use an instance of aLoggerToScilab. There there are other implementatios, for example OpenTsdbLogger which logs to a Time-Series-Database.
  4. In the exit function we should let the logger to flush all data:

    void ProcesorSharingServer::exit() {
    this->logger->flush();
    }
  5. Add code to log these variables when they change:
    1. In the dint function:

      void NonSharingProcessorServer::dint(double t) {
      // remove sent job from the queue
      this->queuedJobs.pop_front();

      //logging
      this->logger->logSignal(t, this->queuedJobs.size(), "currentJobs");

      // set the TA for next internal transition with the finish time of the next job (if there is one)
      this->sigma = std::numeric_limits<double>::max();
      if(! this->queuedJobs.empty()){
      this->sigma = this->queuedJobs.front().second - t;
      }
      }

    2. In the dext function:

      if(this->queuedJobs.size() < this->maxCapacity){ // there is space, add new job
      // add the new job into the queue
      this->addNewJob(t, newJob);

      // next sigma
      this->sigma = this->queuedJobs.front().second - t;

      //logging
      this->logger->logSignal(t, this->queuedJobs.size(), "currentJobs");

      printLog(LOG_LEVEL_FULL_LOGGING , "[%f] %s: New Job #%i (weight=%f) added for processing. total jobs=%i, nextFinishTime=%f \n",t, this->getName(), newJob->getId(), newJob->getWeight(), this->queuedJobs.size(), this->queuedJobs.front().second);
      }

    3. In the lambda function:

      Event NonSharingProcessorServer::lambda(double t) {
      auto finishedJob = this->queuedJobs.front().first;

      //logging
      this->totalFinished++;
      this->logger->logSignal(t, this->totalFinished, "finished");

      ....
      }
    4. In the addNewJob function:

      void NonSharingProcessorServer::addNewJob(double currentTime, std::shared_ptr<Job> newJob){
      double finishTime = currentTime + (newJob->getWeight() / this->serviceTime);

      //logging
      this->logger->logSignal(currentTime, finishTime - currentTime, "processingTime");

      ....
      }

Validation - Analysis in Scilab

  1. Now you can run longer simulations as you are not writing to disk all the time. Change parameters so to make it more realistic: 10.000 cores (simultaneos jobs). Jobs with a random pareto distribution of mean 1, and servicePower=0.3 (in average each job will take 0.3s to process, but there will be lots of short running jobs and few very long running jobs).

    EventFilter.maxJobs = 10000; // Max amount of jobs
    EventFilter.servicePower = 1 / 0.3;
    DataHandler.jobSize = DISTRIBUTION _ PARETO; // (distribution) size of generated jobs
    DataHandler.jobSize_shape = 2.1;
    DataHandler.jobSize_scale = 0.5238095;
    DataHandler.period_mu = 1/400;

  1. Run the simulation. You can simulate 1000 s.
  2. Switch to Scilab
  3. Let check how much time each event took to process. To get the mean processing time: mean(EventFilter_processingTi)
    The mean processing time should be close to 0.3.
  4. But processing time is now different for each event, so lets see the distribution of the processing time in an histogram:
    scf(); histplot(100, EventFilter_processingTi, normalization=%f);



    And to switch to log scale: scf();histplot(100, EventFilter_processingTi, normalization=%f, logflag='nl', rect=[0,0.1, 120, 1000000]);



    There are many events (~10⁶) that finished is less than a second and few which took a lot to process. There was one occurance of an event that took 103s to process. But still this does not affect the mean processing time of 0.3s
  5. Lets check the EF server farm usage (ie used cores in the farm). We expect them to be 100% under use.
    scf(); plot(EventFilter _t, EventFilter _currentJobs, ".-")


    As you can see from the plot, at the begging the cores start getting used quickly. But then, the farm is under utilized: the average core utilization is ~120 (with a maxium of 180 cores) which is 0.08% usage.
    This means that at the current configured rate, it is enough with 180 cores to filter all events.
    You can check that the StorageHandler queue is empty all the time: scf(); plot(SHSampler_t, SHSampler_max, ".-")

  6. Run another simulation, but setting a higher rate for the DH and less cores.
    DataHandler.period_mu = 1/(4000);
    EventFilter.maxJobs = 1000;

  7. Plot as before the EF cores usage, and the SH queue usage :
    scf(); plot(EventFilter _t, EventFilter _currentJobs, ".-")
    scf(); plot(SHSampler_t, SHSampler_max, ".-")




    As you can see now, the farm gets to 100% usage very quickly. Also, the queue in the SF starts getting filled as expected.
  8. What is the filtering rate now?
    Now we did not set the overall rate for the EF, but instead configured how much each event delays to be processe, so the overall rate will then depend on how many servers we configured.
    We can calculate the total output rate:
    max(EventFilter_finished) / max(EventFilter_t)
    ans = 3343.0703

    So the output rate was 3.3KHz events. Which makes sense as the generation rate was configured at 4Khz and the SF queue grows at ~1Khz

    And we can plot how many events were finished over time: scf(); plot(EventFilter_t, EventFilter_finished, ".");

Next Steps

  1. The simulation is going too slow at the moment. One reason is that we are logging values at the EF each time an event arrives or finishes. You could add samplers (as in the first getting started) to speed up the simulation.
  2. In the real system, when serveral events are being processed in the same server, the performance degrades (mainly because cores share network resources). To model this, you could use a ProcessorSharing model to represent each server (as in the first getting started), and VectorialDEVS to have serveral instances of the model (one per server). For that purpose, you will need to use a load balancing strategy (JobScheduler model) to decide to which server to send next job.

-- MatiasAlejandroBonaventura - 2016-05-02

Edit | Attach | Watch | Print version | History: r4 < r3 < r2 < r1 | Backlinks | Raw View | WYSIWYG | More topic actions
Topic revision: r4 - 2016-05-03 - MatiasAlejandroBonaventura
 
    • Cern Search Icon Cern Search
    • TWiki Search Icon TWiki Search
    • Google Search Icon Google Search

    Main All webs login

This site is powered by the TWiki collaboration platform Powered by PerlCopyright &© 2008-2024 by the contributing authors. All material on this collaboration platform is the property of the contributing authors.
or Ideas, requests, problems regarding TWiki? use Discourse or Send feedback