forked from Altiscale/burstingsim
-
Notifications
You must be signed in to change notification settings - Fork 0
/
parser.rb.bk
executable file
·107 lines (93 loc) · 3.42 KB
/
parser.rb.bk
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
require_relative 'input_log_event'
require_relative 'events_stream'
require 'ostruct'
class Parser
def initialize(file)
@file = file
end
def parse
lines = open(@file) { |x| x.readlines }
lines = lines.each { |l| l.chomp!}
lines.each do |l|
l.gsub!(/"/, '')
end
end
#TODO: Drop events if timestamp < timeMin, parse until timestamp <= timeMAX
def parse_and_enqueue(start_date = '-1', end_date = '9999999999', participating_clusters = "")
max = 0
min = 9999999999999
start_date = start_date.to_i
end_date = end_date.to_i
puts "Requested start time: #{Time.at(start_date)} - finish time: #{Time.at(end_date)}"
puts "Loading events from file! This might take a while..."
#lines = open(@file) { |x| x.readlines }
stream = EventsStream.instance
#lines = lines.each { |l| l.chomp!}
#lines.each do |l|
skipped_events_count = 0
lines_count=0
File.readlines(@file).each do |l|
lines_count += 1
begin
l.chomp!
l.gsub!(/"/, '')
l.gsub!("\t",",") #make sure file is comma delimited
event_timestamp = l.match(",").pre_match.to_i
if ( event_timestamp < start_date)
puts "too early..."
skipped_events_count += 1
next
elsif (event_timestamp > end_date)
puts "too late..."
break
else
event = create_event(l)
next unless (participating_clusters.include?(event.cluster_name) || participating_clusters.empty?)
max = (event_timestamp > max)? event_timestamp : max
min = (event_timestamp < min)? event_timestamp : min
stream.queue.push(event)
end
rescue
puts "Problem with line #{lines_count} with content: \"#{l}\""
exit
raise
next
end
end
puts "Skipped events: #{skipped_events_count}"
puts "Observed min time: #{Time.at(min)} - max time: #{Time.at(max)}"
#puts "manual min: #{Time.at(1420070400)} - max: #{Time.at(1421787420)}"
#puts "2015-06-01: #{Time.new(2015,06,01).to_i} - 2015-01-01 #{Time.new(2015, 01, 01).to_i}"
#puts "2015-06-01: #{Time.new('2015','06','01').to_i} - 2015-01-01 #{Time.new('2015', '01', '01').to_i}"
#puts "min-max query: min: #{Time.at(1420070400)} - max: #{Time.at(1433116740)}"
#puts "max observed from cmd: #{Time.at(1433116740)}"
#1420070460 - 1420070460 - 1423961160 - 1421404740
stream
end
def create_event(line)
timestamp = line.match(",").pre_match
event_attr = line.match(",").post_match.split(',')
eventAttr = OpenStruct.new(
'cluster_name' => event_attr[0],
'job_id' => event_attr[1],
'host_name' => event_attr[2],
'colloc_name' => event_attr[2].split('.')[1],
'task_status' => event_attr[3],
'task_count' => event_attr[4])
event = InputLogEvent.new(timestamp, eventAttr.each_pair.to_a)
end
end
if __FILE__ == $0
parser = Parser.new("/Users/admin/Documents/hive-results/test.csv")
lines = parser.parse()
puts lines
puts "---------"
stream = parser.parse_and_enqueue
stream.print
=begin
lines = parser.parse()
puts lines
lineArray = lines[0].split(",")
puts lineArray[0]
=end
end