-
Notifications
You must be signed in to change notification settings - Fork 1
/
multiprocess_testing.py
145 lines (110 loc) · 5.5 KB
/
multiprocess_testing.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
import time
import multiprocessing
from Classes.filtrations import Filtration
from Classes.menu import menu
from Classes.PCPDS_manager import PCPDS_Manager
import Classes.file_manager as file_manager
import os
def process_run():
pcpds_manager = PCPDS_Manager()
# List the directories
# Ask for the directory
print("Enter the Collection of pcpds objects you wish to generate persistance diagramsfor.")
collection = menu.get_input("Directory: ")
pcpds_manager.get_path_manager().set_cur_dir(collection)
valid = pcpds_manager.get_collection_dir()
while(not valid):
print("Invalid collection name:", pcpds_manager.get_path_manager().get_cur_dir() ,"try again.", valid)
collection = menu.get_input("Directory: ")
pcpds_manager.get_path_manager().set_cur_dir(collection)
valid = pcpds_manager.get_collection_dir()
# Verify the directory
print("Valid Directory Confirmed:", pcpds_manager.get_path_manager().get_full_cur_dir())
# Loop for choosing filtration method:
print("Choose a filtration method: [0] Rips, [1] Upper Star, [2] Lower Star.")
choice = menu.get_int_input()
while not (choice < 3 and choice > -1):
print("Please enter a valid number between 0-2.")
choice = menu.get_int_input()
# Selects the filter function to be used.
filter = None
if choice is 0:
filter = Filtration.get_rips_diagram
elif choice is 1:
filter = Filtration.get_upper_star
elif choice is 2:
filter = Filtration.get_lower_star
# Start timer
start_time = time.time()
print("Would you like to use multi-processing to attempt to speed things up? [0] No. [1] Yes.")
multiproc = menu.get_int_input()
if(multiproc):
for file in os.listdir(pcpds_manager.get_path_manager().get_full_cur_dir_var(collection)):
# Sets up the process
process = multiprocessing.Process(target=generate_persistence_diagram, args=(pcpds_manager, file, filter))
process.start()
process.join()
process.terminate()
else:
# Process the point clouds into persistance diagrams without using multiprocessing
for file in os.listdir(pcpds_manager.get_path_manager().get_full_cur_dir_var(collection)):
generate_persistence_diagram(pcpds_manager, file, filter)
print("Finished filtrating persistance diagrams for files in: ", str(time.time() - start_time))
def pool_run():
pcpds_manager = PCPDS_Manager()
# List the directories
# Ask for the directory
print("Enter the Collection of pcpds objects you wish to generate persistance diagramsfor.")
collection = menu.get_input("Directory: ")
pcpds_manager.get_path_manager().set_cur_dir(collection)
valid = pcpds_manager.get_collection_dir()
while(not valid):
print("Invalid collection name:", pcpds_manager.get_path_manager().get_cur_dir() ,"try again.", valid)
collection = menu.get_input("Directory: ")
pcpds_manager.get_path_manager().set_cur_dir(collection)
valid = pcpds_manager.get_collection_dir()
# Verify the directory
print("Valid Directory Confirmed:", pcpds_manager.get_path_manager().get_full_cur_dir())
# Loop for choosing filtration method:
print("Choose a filtration method: [0] Rips, [1] Upper Star, [2] Lower Star.")
choice = menu.get_int_input()
while not (choice < 3 and choice > -1):
print("Please enter a valid number between 0-2.")
choice = menu.get_int_input()
# Selects the filter function to be used.
filter = None
if choice is 0:
filter = Filtration.get_rips_diagram
elif choice is 1:
filter = Filtration.get_upper_star
elif choice is 2:
filter = Filtration.get_lower_star
# Start timer
start_time = time.time()
# TODO: Add filter for '.json' objects as it will have problems on macs otherwise?
# TODO: set to the number of items we think the cpu should handle at a time based on total cpu count.
pool_size = 10
process_pool = []
pool = multiprocessing.Pool()
for file in os.listdir(pcpds_manager.get_path_manager().get_full_cur_dir_var(collection)):
# Build a process pool
process_pool.append(file)
if(len(process_pool) >= pool_size):
# send the process pool to a cpu
# TODO: Need a better way of passing in arguements to make using this method justifiable when I can't gaurentee it's time complexity will be beter.
pool.map(generate_persistence_diagram, process_pool, args(pcpds_manager, file, filter))
# Empty pool for next set.
process_pool.clear()
pool.close()
# finish processing the items left in process pool
print("Finished filtrating persistance diagrams for files in: ", str(time.time() - start_time))
def generate_persistence_diagram(pcpds_manager, file, filteration):
file_path = os.path.join(pcpds_manager.get_path_manager().get_full_cur_dir(), file)
pcpds_obj = file_manager.load(file_path)
# TODO: Add capabilitiy to select filtration method using abstract function stuff.
result = filteration(pcpds_obj)
file_manager.save(result, pcpds_manager.get_path_manager().get_full_cur_dir(), pcpds_obj.get_cellID())
print(file, "filtrated & Saved.")
def get_processor_count():
return multiprocessing.cpu_count()
print(get_processor_count())