DSPS/purePy - Simulation/Application/Simulation.py

298 lines
11 KiB
Python

import os
import time
from pprint import pprint
import matplotlib.pyplot as plt
from Application.Server import *
from Application.Service import *
from Application.DistributionFactory import *
from queue import Queue
import pandas as pd
import numpy as np
class Simulation:
def __init__(self, schema, serviceSchema):
self.schema = schema
self.serviceSchema = serviceSchema
self.observationQueue = Queue()
self.dfDicts = {}
def main(self, profile, mapping, serviceDict, distRequest):
distributionFactory = DistributionFactory()
# https://www.jsonschemavalidator.net/
suc, e = distributionFactory.validateContentvsSchema(profile, self.schema)
if not suc:
raise Exception(e)
suc, e = distributionFactory.validateContentvsSchema(serviceDict, self.serviceSchema)
if not suc:
raise Exception(e)
self.profile = distributionFactory.getProfileAsDict(profile)
print("Generating input distribution")
eventSeries = distributionFactory.getScenarioDist(distRequest, profile, mapping)
services = self.getServices(serviceDict)
print("starting simulation")
self.simLoop(eventSeries, services, mapping)
def saveOberservations(self, dfs):
observationDict = self.observationQueueToDict()
self.observations = observationDict
self.saveSimResult(observationDict, savePath="../SimResults.json")
return observationDict
def addEvent(self, eventDict, event, ts):
if ts in eventDict:
eventDict[ts].append(event)
else:
eventDict[ts] = [event]
def simLoop(self, eventSeries, services, mapping):
done = 0
t = 0
t3 = time.time_ns()
times = set()
lastT = 0
doneInteractions = dict()
awaitedFunctions = dict()
ids = []
# primary loop see thesis chapter "simulation engine"
while len(eventSeries.keys()) > 0:
# get next timestamp and events
t = min(eventSeries.keys())
events = eventSeries.pop(t)
if t < lastT:
lastT = t
#print(len(events), events[0])
self.observationQueue.put(("sim_events", t, "error", len(events)))
continue
lastT = t
# print progression
if done % 1000 == 0:
print("simulation time: " + str(t /1E9), end="\r")
self.observationQueue.put(("sim_events", t, "total", len(events)))
# secondary loop see thesis chapter "simulation engine"
# iterate over all events
while events:
events = sorted(events)
event = events.pop()
# monitor for simulation internal error
if event.type is None:
if event.function.runtimeID in ids and event.function.start == event.function.scheduled :
raise Exception(str(event.function.runtimeID) + " Runtime ID was not unique")
else:
ids.append(event.function.runtimeID)
# monitor for simulation internal error
if t != event.t or (event.function is not None and t != event.function.start):
self.observationQueue.put(("sim_events", t, "t_not_start", 1))
print(event.function.functionID, "\n", t, "\n", event.t, "\n", event.function.start, "\n")
event.function.start = t
# retrieve completed functions and callbacks
d, ets, callbacks = services[event.serviceId].pop(t)
done += len(d)
# track completed functions for visualization
for doneFunction in d:
self.trackPop(doneInteractions, awaitedFunctions, self.observationQueue, doneFunction, t)
# create events for callbacks
for callback in callbacks:
callback.setCallbackStart(t)
serviceID = mapping[callback.functionID]
self.addEvent(eventSeries, Event(callback.start, None, serviceID, callback), callback.start)
if event.type == "recalc":
# track number of recalc events
self.observationQueue.put(("sim_events", t, "recalculations", 1))
else:
# push function if function is in event
# event has function if tape is None
# if service is busy function is returned to be referred
ets, function = services[event.serviceId].push(event.function, t)
# function is not None if the function was deferred, because the service didn't have ressources
if function is not None:
function = copy.deepcopy(function)
serviceID = mapping[function.functionID]
self.addEvent(eventSeries, Event(function.start, None, serviceID, function), function.start)
self.observationQueue.put(("sim_events", t, "rescheduled", 1))
else:
self.trackPush(awaitedFunctions, event.function)
for et in ets:
if et not in eventSeries.keys():
# recalculation events occour when a function should be finished and a server should have free ressources
# that function might have already finished and there might be multiple recalc events for a single function
self.addEvent(eventSeries, Event(et, "recalc", services[event.serviceId].serviceId, None), et)
times.add(et)
# monitor services for visualization
self.monitorServices(self.observationQueue, services, t)
# track completed interactions
for key, value in doneInteractions.items():
self.observationQueue.put(("dones", t, key, value))
#self.observationQueue.put(("dones", t, "total", 0))
self.observationQueue.put("done")
print("time: ", t/ 1E9, "completed functions: ", done, " in ", (time.time_ns() - t3) / 1E9, "s")
def saveSimResult(self, observationDict, savePath="SimResults.json"):
savePath = os.path.join(os.path.dirname(__file__), savePath)
print(savePath)
with open(savePath, 'w') as fp:
json.dump(observationDict, fp)
def getServices(self, serviceDict):
'''create service objects from service definition'''
services = dict()
for service in serviceDict["services"]:
tmpService = copy.deepcopy(service)
tmpService["defaultServer"] = copy.deepcopy(Server(**service["defaultServer"]))
services[service["serviceID"]] = Service(**tmpService)
return services
def observationQueueToDict(self, chunks=None):
columnNames = {
"sim_events": ["active"],
"dones": ["completed"],
"response_time": ["delay", "response time"],
"service_util": ["CPU", "RAM", "NET", "IO"],
}
i = 0
while True:
if chunks is not None and i >= chunks:
break
content = self.observationQueue.get()
if content == "done":
break
Simulation.transfromQueue(columnNames, content, self.dfDicts)
i += 1
return self.dfDicts
@staticmethod
def transfromQueue(columnNames, content, dfDicts):
'''transform Queue into Dicts'''
key, t, identifier, value = content
# this is bad
# I am not sure why it happens
if isinstance(value, np.ndarray):
value = value
else:
if not isinstance(value, list):
value = [value]
else:
value = value
if key not in dfDicts:
dfDicts[key] = {"t": [], "identifier": []}
dfDicts[key]["t"].append(t)
dfDicts[key]["identifier"].append(identifier)
for i, val in enumerate(value):
if i >= len(columnNames[key]):
newKey = "value_" + str(i)
else:
newKey = columnNames[key][i]
if newKey not in dfDicts[key]:
dfDicts[key][newKey] = []
dfDicts[key][newKey].append(val)
def plotResults(self, dfDicts):
dfs = dict()
for value, df in dfDicts.items():
dfs[value] = pd.DataFrame.from_dict(df)
dfs[value]["t"] = pd.to_datetime(dfs[value]["t"], unit='ns')
dfs[value].set_index("t", inplace=True)
for key, df in dfs.items():
for i in df.identifier.unique():
if key == "sim_events":
df.loc[df["identifier"] == i].resample(rule="1s").sum().interpolate().plot(kind='line', title=f"{key} {i}")
else:
df.loc[df["identifier"] == i].resample(rule="1s").mean().interpolate().plot(kind='line', title=f"{key} {i}")
plt.show()
def monitorServices(self, observationQueue, services, t):
avgUtil = [0, 0, 0, 0]
for service in services.values():
observationQueue.put(("service_util", t, "service " + service.serviceId, service.getAvgUtil()))
avgUtil = np.add(avgUtil, service.getAvgUtil())
avgUtil /= len(services)
observationQueue.put(("service_util", t, "average", avgUtil))
def trackPush(self, awaitedFunctions, function):
'''if function was first in interaction put the last functionID in the awaited functions, used to track interaction delay and time'''
interactions = self.profile["scenarios"][function.scenarioID]["interactions"]
# get first function in interaction
firstF = None
interactionFunctions = list(interactions[function.interactionID]["functions"].values())
if len(interactionFunctions) == 1:
firstF = interactionFunctions[0]["functionID"]
for f in interactionFunctions:
if f["callbacks"] != ["-1"]:
firstF = f["functionID"]
break
if function.functionID == firstF:
callbacks = function.callbacks
if callbacks == ["-1"]:
awaitedFunctions[function.runtimeID] = [function.start, function.scheduled]
return
cb2 = None
while True:
if callbacks == ["-1"]:
break
for callback in callbacks:
if callback != "-1":
cb2 = callback
callbacks = callback.callbacks
continue
awaitedRunTimeID = cb2.runtimeID
awaitedFunctions[awaitedRunTimeID] = [function.start, function.scheduled]
def trackPop(self, doneInteractions, awaitedFunctions, observationQueue, function, t):
'''remove function from awaited functions if interaction is completed'''
key = f"scenario {str(function.scenarioID)} interaction {str(function.interactionID)}"
if function.runtimeID in awaitedFunctions:
if key not in doneInteractions:
doneInteractions[key] = 0
doneInteractions[key] += 1
fStart, fScheduled = awaitedFunctions.pop(function.runtimeID)
tmp = [(fStart - fScheduled) / 1E9, (function.end - fScheduled)/ 1E9]
observationQueue.put(("response_time", t, key, tmp))