result.rb 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. module Blazer
  2. class Result
  3. attr_reader :data_source, :columns, :rows, :error, :cached_at, :just_cached
  4. def initialize(data_source, columns, rows, error, cached_at, just_cached)
  5. @data_source = data_source
  6. @columns = columns
  7. @rows = rows
  8. @error = error
  9. @cached_at = cached_at
  10. @just_cached = just_cached
  11. end
  12. def timed_out?
  13. error == Blazer::TIMEOUT_MESSAGE
  14. end
  15. def cached?
  16. cached_at.present?
  17. end
  18. def boom
  19. @boom ||= begin
  20. boom = {}
  21. columns.each_with_index do |key, i|
  22. smart_columns_data_source =
  23. ([data_source] + Array(data_source.settings["inherit_smart_settings"]).map { |ds| Blazer.data_sources[ds] }).find { |ds| ds.smart_columns[key] }
  24. if smart_columns_data_source
  25. query = smart_columns_data_source.smart_columns[key]
  26. res =
  27. if query.is_a?(Hash)
  28. query
  29. else
  30. values = rows.map { |r| r[i] }.compact.uniq
  31. result = smart_columns_data_source.run_statement(ActiveRecord::Base.send(:sanitize_sql_array, [query.sub("{value}", "(?)"), values]))
  32. result.rows
  33. end
  34. boom[key] = Hash[res.map { |k, v| [k.nil? ? k : k.to_s, v] }]
  35. end
  36. end
  37. boom
  38. end
  39. end
  40. def column_types
  41. @column_types ||= begin
  42. columns.each_with_index.map do |k, i|
  43. v = (rows.find { |r| r[i] } || {})[i]
  44. if boom[k]
  45. "string"
  46. elsif v.is_a?(Numeric)
  47. "numeric"
  48. elsif v.is_a?(Time) || v.is_a?(Date)
  49. "time"
  50. elsif v.nil?
  51. nil
  52. else
  53. "string"
  54. end
  55. end
  56. end
  57. end
  58. def chart_type
  59. @chart_type ||= begin
  60. if column_types.compact.size >= 2 && column_types.compact == ["time"] + (column_types.compact.size - 1).times.map { "numeric" }
  61. "line"
  62. elsif column_types == ["time", "string", "numeric"]
  63. "line2"
  64. elsif column_types.compact.size >= 2 && column_types == ["string"] + (column_types.compact.size - 1).times.map { "numeric" }
  65. "bar"
  66. elsif column_types == ["string", "string", "numeric"]
  67. "bar2"
  68. elsif column_types == ["numeric", "numeric"]
  69. "scatter"
  70. end
  71. end
  72. end
  73. def detect_anomaly
  74. anomaly = nil
  75. message = nil
  76. if rows.empty?
  77. message = "No data"
  78. else
  79. if chart_type == "line" || chart_type == "line2"
  80. series = []
  81. if chart_type == "line"
  82. columns[1..-1].each_with_index.each do |k, i|
  83. series << {name: k, data: rows.map{ |r| [r[0], r[i + 1]] }}
  84. end
  85. else
  86. rows.group_by { |r| v = r[1]; (boom[columns[1]] || {})[v.to_s] || v }.each_with_index.map do |(name, v), i|
  87. series << {name: name, data: v.map { |v2| [v2[0], v2[2]] }}
  88. end
  89. end
  90. current_series = nil
  91. begin
  92. anomalies = []
  93. series.each do |s|
  94. current_series = s[:name]
  95. anomalies << s[:name] if anomaly?(s[:data])
  96. end
  97. anomaly = anomalies.any?
  98. if anomaly
  99. if anomalies.size == 1
  100. message = "Anomaly detected in #{anomalies.first}"
  101. else
  102. message = "Anomalies detected in #{anomalies.to_sentence}"
  103. end
  104. else
  105. message = "No anomalies detected"
  106. end
  107. rescue => e
  108. message = "#{current_series}: #{e.message}"
  109. raise e if Rails.env.development?
  110. end
  111. else
  112. message = "Bad format"
  113. end
  114. end
  115. [anomaly, message]
  116. end
  117. def anomaly?(series)
  118. series = series.reject { |v| v[0].nil? }.sort_by { |v| v[0] }
  119. csv_str =
  120. CSV.generate do |csv|
  121. csv << ["timestamp", "count"]
  122. series.each do |row|
  123. csv << row
  124. end
  125. end
  126. r_script = %x[which Rscript].chomp
  127. type = series.any? && series.last.first.to_time - series.first.first.to_time >= 2.weeks ? "ts" : "vec"
  128. args = [type, csv_str]
  129. raise "R not found" if r_script.empty?
  130. command = "#{r_script} --vanilla #{File.expand_path("../detect_anomalies.R", __FILE__)} #{args.map { |a| Shellwords.escape(a) }.join(" ")}"
  131. output = %x[#{command}]
  132. if output.empty?
  133. raise "Unknown R error"
  134. end
  135. rows = CSV.parse(output, headers: true)
  136. error = rows.first && rows.first["x"]
  137. raise error if error
  138. timestamps = []
  139. if type == "ts"
  140. rows.each do |row|
  141. timestamps << Time.parse(row["timestamp"])
  142. end
  143. timestamps.include?(series.last[0].to_time)
  144. else
  145. rows.each do |row|
  146. timestamps << row["index"].to_i
  147. end
  148. timestamps.include?(series.length)
  149. end
  150. end
  151. end
  152. end