data_source.rb 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257
  1. require "digest/md5"
  2. module Blazer
  3. class DataSource
  4. attr_reader :id, :settings, :connection_model
  5. def initialize(id, settings)
  6. @id = id
  7. @settings = settings
  8. unless settings["url"] || Rails.env.development?
  9. raise Blazer::Error, "Empty url"
  10. end
  11. @connection_model =
  12. Class.new(Blazer::Connection) do
  13. def self.name
  14. "Blazer::Connection::#{object_id}"
  15. end
  16. establish_connection(settings["url"]) if settings["url"]
  17. end
  18. end
  19. def name
  20. settings["name"] || @id
  21. end
  22. def linked_columns
  23. settings["linked_columns"] || {}
  24. end
  25. def smart_columns
  26. settings["smart_columns"] || {}
  27. end
  28. def smart_variables
  29. settings["smart_variables"] || {}
  30. end
  31. def variable_defaults
  32. settings["variable_defaults"] || {}
  33. end
  34. def timeout
  35. settings["timeout"]
  36. end
  37. def cache
  38. @cache ||= begin
  39. if settings["cache"].is_a?(Hash)
  40. settings["cache"]
  41. elsif settings["cache"]
  42. {
  43. "mode" => "all",
  44. "expires_in" => settings["cache"]
  45. }
  46. else
  47. {
  48. "mode" => "off"
  49. }
  50. end
  51. end
  52. end
  53. def cache_mode
  54. cache["mode"]
  55. end
  56. def cache_expires_in
  57. (cache["expires_in"] || 60).to_f
  58. end
  59. def cache_slow_threshold
  60. (cache["slow_threshold"] || 15).to_f
  61. end
  62. def local_time_suffix
  63. @local_time_suffix ||= Array(settings["local_time_suffix"])
  64. end
  65. def use_transaction?
  66. settings.key?("use_transaction") ? settings["use_transaction"] : true
  67. end
  68. def cost(statement)
  69. if postgresql? || redshift?
  70. begin
  71. result = connection_model.connection.select_all("EXPLAIN #{statement}")
  72. match = /cost=\d+\.\d+..(\d+\.\d+) /.match(result.rows.first.first)
  73. match[1] if match
  74. rescue ActiveRecord::StatementInvalid
  75. # do nothing
  76. end
  77. end
  78. end
  79. def run_main_statement(statement, options = {})
  80. query = options[:query]
  81. Blazer.transform_statement.call(self, statement) if Blazer.transform_statement
  82. # audit
  83. if Blazer.audit
  84. audit = Blazer::Audit.new(statement: statement)
  85. audit.query = query
  86. audit.data_source = id
  87. audit.user = options[:user]
  88. audit.save!
  89. end
  90. start_time = Time.now
  91. columns, rows, error, cached_at, just_cached = run_statement(statement, options.merge(with_just_cached: true))
  92. duration = Time.now - start_time
  93. if Blazer.audit
  94. audit.duration = duration if audit.respond_to?(:duration=)
  95. audit.error = error if audit.respond_to?(:error=)
  96. audit.timed_out = error == Blazer::TIMEOUT_MESSAGE if audit.respond_to?(:timed_out=)
  97. audit.cached = cached_at.present? if audit.respond_to?(:cached=)
  98. if !cached_at && duration >= 10
  99. audit.cost = cost(statement) if audit.respond_to?(:cost=)
  100. end
  101. audit.save! if audit.changed?
  102. end
  103. if query && error != Blazer::TIMEOUT_MESSAGE
  104. query.checks.each do |check|
  105. check.update_state(rows, error)
  106. end
  107. end
  108. [columns, rows, error, cached_at, just_cached]
  109. end
  110. def run_statement(statement, options = {})
  111. columns = nil
  112. rows = nil
  113. error = nil
  114. cached_at = nil
  115. just_cached = false
  116. cache_key = self.cache_key(statement) if cache
  117. if cache && !options[:refresh_cache]
  118. value = Blazer.cache.read(cache_key)
  119. columns, rows, cached_at = Marshal.load(value) if value
  120. end
  121. unless rows
  122. comment = "blazer"
  123. if options[:user].respond_to?(:id)
  124. comment << ",user_id:#{options[:user].id}"
  125. end
  126. if options[:user].respond_to?(Blazer.user_name)
  127. # only include letters, numbers, and spaces to prevent injection
  128. comment << ",user_name:#{options[:user].send(Blazer.user_name).to_s.gsub(/[^a-zA-Z0-9 ]/, "")}"
  129. end
  130. if options[:query].respond_to?(:id)
  131. comment << ",query_id:#{options[:query].id}"
  132. end
  133. columns, rows, error, just_cached = run_statement_helper(statement, comment)
  134. end
  135. output = [columns, rows, error, cached_at]
  136. output << just_cached if options[:with_just_cached]
  137. output
  138. end
  139. def clear_cache(statement)
  140. Blazer.cache.delete(cache_key(statement))
  141. end
  142. def cache_key(statement)
  143. ["blazer", "v3", id, Digest::MD5.hexdigest(statement)].join("/")
  144. end
  145. def schemas
  146. default_schema = (postgresql? || redshift?) ? "public" : connection_model.connection_config[:database]
  147. settings["schemas"] || [connection_model.connection_config[:schema] || default_schema]
  148. end
  149. def tables
  150. columns, rows, error, cached_at = run_statement(connection_model.send(:sanitize_sql_array, ["SELECT table_name, column_name, ordinal_position, data_type FROM information_schema.columns WHERE table_schema IN (?)", schemas]))
  151. rows.map(&:first).uniq.sort
  152. end
  153. def postgresql?
  154. ["PostgreSQL", "PostGIS"].include?(adapter_name)
  155. end
  156. def redshift?
  157. ["Redshift"].include?(adapter_name)
  158. end
  159. def mysql?
  160. ["MySQL", "Mysql2", "Mysql2Spatial"].include?(adapter_name)
  161. end
  162. def reconnect
  163. connection_model.establish_connection(settings["url"])
  164. end
  165. protected
  166. def run_statement_helper(statement, comment)
  167. columns = []
  168. rows = []
  169. error = nil
  170. start_time = Time.now
  171. in_transaction do
  172. begin
  173. if timeout
  174. if postgresql? || redshift?
  175. connection_model.connection.execute("SET statement_timeout = #{timeout.to_i * 1000}")
  176. elsif mysql?
  177. connection_model.connection.execute("SET max_execution_time = #{timeout.to_i * 1000}")
  178. else
  179. raise Blazer::TimeoutNotSupported, "Timeout not supported for #{adapter_name} adapter"
  180. end
  181. end
  182. result = connection_model.connection.select_all("#{statement} /*#{comment}*/")
  183. columns = result.columns
  184. cast_method = Rails::VERSION::MAJOR < 5 ? :type_cast : :cast_value
  185. result.rows.each do |untyped_row|
  186. rows << (result.column_types.empty? ? untyped_row : columns.each_with_index.map { |c, i| untyped_row[i] ? result.column_types[c].send(cast_method, untyped_row[i]) : nil })
  187. end
  188. rescue ActiveRecord::StatementInvalid => e
  189. error = e.message.sub(/.+ERROR: /, "")
  190. error = Blazer::TIMEOUT_MESSAGE if Blazer::TIMEOUT_ERRORS.any? { |e| error.include?(e) }
  191. end
  192. end
  193. duration = Time.now - start_time
  194. just_cached = false
  195. if !error && (cache_mode == "all" || (cache_mode == "slow" && duration >= cache_slow_threshold))
  196. Blazer.cache.write(cache_key(statement), Marshal.dump([columns, rows, Time.now]), expires_in: cache_expires_in.to_f * 60)
  197. just_cached = true
  198. end
  199. [columns, rows, error, just_cached]
  200. end
  201. def adapter_name
  202. connection_model.connection.adapter_name
  203. end
  204. def in_transaction
  205. if use_transaction?
  206. connection_model.transaction do
  207. yield
  208. raise ActiveRecord::Rollback
  209. end
  210. else
  211. yield
  212. end
  213. end
  214. end
  215. end