Hallo Zusammen,
ich habe mehrere größere Dataframes mit 3 Mio. - 33 Mio. Einträgen.
Durch das größte Dataframe liegt mein Auslastung des RAMs bei ca. 80%, weshalb ich den Zugriff auf die Dataframes jedem Kindprozess ermöglichen möchte, statt jeder Prozess die Dataframes eigen einläd.
Dafür habe ich die Lösung nach folgendem StackOverflow-Betiräg umgesetzt:
https://stackoverflow.com/questions/22487296/multiprocessing-in-python-sharing-large-object-e-g-pandas-dataframe-between
def startMultiprocessingMultiple(numberOfInstances,function,datasetToSplit,*datasets):
print("INFO: Multiprocessing with Instances: " + str(numberOfInstances))
part = int(len(datasetToSplit)/numberOfInstances)
usedData = []
instances = []
n = 0
mgr = Manager()
ns = mgr.Namespace()
while n < len(datasets):
if n == 0:
ns.a = datasets[n]
elif n == 1:
ns.b = datasets[n]
elif n == 2:
ns.c = datasets[n]
elif n == 3:
ns.d = datasets[n]
n = n + 1
i = 0
while i < numberOfInstances:
usedData = []
if i == 0:
#print(":"+str(part))
usedData = datasetToSplit[:part]
elif i == (numberOfInstances - 1):
#print(str(part*i)+":")
usedData = datasetToSplit[(part*i):]
else:
#print(str(part*i) +":"+ str(part*(i+1)))
usedData = datasetToSplit[(part*i):(part*(i+1))]
instances.append(mp.Process(target=function,
args=(i, usedData,ns)))
instances[i].start()
print("INFO: Start Instance "+ str(i) + " with PID: " + str(instances[i].pid))
i = i + 1
for instance in instances:
instance.join()
print("INFO: Instance " + str(instance.pid) +" finished")
print("INFO: All Instances finished")
Ausschnitt aus meiner angezielten Funktion:
def mergeBuslinesAndGPS(idInstance,lines,nameSpace):
busGPS = nameSpace.a
buslinesGPS = nameSpace.b
smartCardMetroBus = nameSpace.c
Leider erhalte ich beim jedem ausführen einen - meist - anderen Fehler: OSError, EOFError, BrokenPipeError, Connection reset by peer ...
NFO: Start Enviroment for mergeBuslinesAndGPS
INFO: Multiprocessing with Instances: 2
INFO: Start Instance 0 with PID: 3951
INFO: Start Instance 1 with PID: 3952
Process Process-3:
Traceback (most recent call last):
File "/usr/local/anaconda/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
self.run()
File "/usr/local/anaconda/lib/python3.7/multiprocessing/process.py", line 99, in run
self._target(*self._args, **self._kwargs)
File "/.../Source/Functions/DataPreprocessing/GPS.py", line 21, in mergeBuslinesAndGPS
busGPS = nameSpace.a
File "/usr/local/anaconda/lib/python3.7/multiprocessing/managers.py", line 1122, in __getattr__
return callmethod('__getattribute__', (key,))
File "/usr/local/anaconda/lib/python3.7/multiprocessing/managers.py", line 819, in _callmethod
kind, result = conn.recv()
File "/usr/local/anaconda/lib/python3.7/multiprocessing/connection.py", line 250, in recv
buf = self._recv_bytes()
File "/usr/local/anaconda/lib/python3.7/multiprocessing/connection.py", line 411, in _recv_bytes
return self._recv(size)
File "/usr/local/anaconda/lib/python3.7/multiprocessing/connection.py", line 385, in _recv
raise OSError("got end of file during message")
OSError: got end of file during message
Process Process-2:
Traceback (most recent call last):
File "/usr/local/anaconda/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
self.run()
File "/usr/local/anaconda/lib/python3.7/multiprocessing/process.py", line 99, in run
self._target(*self._args, **self._kwargs)
File "/.../Source/Functions/DataPreprocessing/GPS.py", line 22, in mergeBuslinesAndGPS
buslinesGPS = nameSpace.b
File "/usr/local/anaconda/lib/python3.7/multiprocessing/managers.py", line 1122, in __getattr__
return callmethod('__getattribute__', (key,))
File "/usr/local/anaconda/lib/python3.7/multiprocessing/managers.py", line 818, in _callmethod
conn.send((self._id, methodname, args, kwds))
File "/usr/local/anaconda/lib/python3.7/multiprocessing/connection.py", line 206, in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "/usr/local/anaconda/lib/python3.7/multiprocessing/connection.py", line 404, in _send_bytes
self._send(header + buf)
File "/usr/local/anaconda/lib/python3.7/multiprocessing/connection.py", line 368, in _send
n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe
INFO: Instance 3951 finished
INFO: Instance 3952 finished
INFO: All Instances finished
Durch die unterschiedlichen Fehler habe ich leider inzwischen keinen Ansatz mehr, was das Problem sein könnte.
Alle Fehler scheinen ein Problem mit dem Zugriff auf die Dataframes anzudeuten.
Während der Ausführung habe ich keine Operationen, welche die Dataframes verändern.
Es exisiteren "nur" Such-Operationen:
splitDataset = smartCardMetroBus[smartCardMetroBus["Line"] == line]
...
busline = buslinesGPS[buslinesGPS["Line"] == str(line)]
Gibt es Ideen? ... Vorschläge? ... habe ich was "grundsätzliches" falsch gemacht?
Falls ich nur ein Prozess starte, funktioniert alles problemlos :rolleyes:
Danke für ein Feedback! 🙂
Liebe Grüße,
BigLin
P.S. mein letzter Login war am 04.09.2010... schon ein bisschen her... schön wieder hier zu sein 😛