data_source.rb 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266
  1. require "digest/md5"
  2. module Blazer
  3. class DataSource
  4. attr_reader :id, :settings, :connection_model
  5. def initialize(id, settings)
  6. @id = id
  7. @settings = settings
  8. unless settings["url"] || Rails.env.development?
  9. raise Blazer::Error, "Empty url"
  10. end
  11. @connection_model =
  12. Class.new(Blazer::Connection) do
  13. def self.name
  14. "Blazer::Connection::#{object_id}"
  15. end
  16. establish_connection(settings["url"]) if settings["url"]
  17. end
  18. end
  19. def name
  20. settings["name"] || @id
  21. end
  22. def linked_columns
  23. settings["linked_columns"] || {}
  24. end
  25. def smart_columns
  26. settings["smart_columns"] || {}
  27. end
  28. def smart_variables
  29. settings["smart_variables"] || {}
  30. end
  31. def variable_defaults
  32. settings["variable_defaults"] || {}
  33. end
  34. def timeout
  35. settings["timeout"]
  36. end
  37. def cache
  38. @cache ||= begin
  39. if settings["cache"].is_a?(Hash)
  40. settings["cache"]
  41. elsif settings["cache"]
  42. {
  43. "mode" => "all",
  44. "expires_in" => settings["cache"]
  45. }
  46. else
  47. {
  48. "mode" => "off"
  49. }
  50. end
  51. end
  52. end
  53. def cache_mode
  54. cache["mode"]
  55. end
  56. def cache_expires_in
  57. (cache["expires_in"] || 60).to_f
  58. end
  59. def cache_slow_threshold
  60. (cache["slow_threshold"] || 15).to_f
  61. end
  62. def local_time_suffix
  63. @local_time_suffix ||= Array(settings["local_time_suffix"])
  64. end
  65. def use_transaction?
  66. settings.key?("use_transaction") ? settings["use_transaction"] : true
  67. end
  68. def cost(statement)
  69. result = explain(statement)
  70. match = /cost=\d+\.\d+..(\d+\.\d+) /.match(result)
  71. match[1] if match
  72. end
  73. def explain(statement)
  74. if postgresql? || redshift?
  75. connection_model.connection.select_all("EXPLAIN #{statement}").rows.first.first
  76. end
  77. rescue
  78. nil
  79. end
  80. def run_main_statement(statement, options = {})
  81. query = options[:query]
  82. Blazer.transform_statement.call(self, statement) if Blazer.transform_statement
  83. # audit
  84. if Blazer.audit
  85. audit = Blazer::Audit.new(statement: statement)
  86. audit.query = query
  87. audit.data_source = id
  88. audit.user = options[:user]
  89. audit.save!
  90. end
  91. start_time = Time.now
  92. columns, rows, error, cached_at, just_cached = run_statement(statement, options.merge(with_just_cached: true))
  93. duration = Time.now - start_time
  94. if Blazer.audit
  95. audit.duration = duration if audit.respond_to?(:duration=)
  96. audit.error = error if audit.respond_to?(:error=)
  97. audit.timed_out = error == Blazer::TIMEOUT_MESSAGE if audit.respond_to?(:timed_out=)
  98. audit.cached = cached_at.present? if audit.respond_to?(:cached=)
  99. if !cached_at && duration >= 10
  100. audit.cost = cost(statement) if audit.respond_to?(:cost=)
  101. end
  102. audit.save! if audit.changed?
  103. end
  104. if query && error != Blazer::TIMEOUT_MESSAGE
  105. query.checks.each do |check|
  106. check.update_state(rows, error)
  107. end
  108. end
  109. [columns, rows, error, cached_at, just_cached]
  110. end
  111. def run_statement(statement, options = {})
  112. columns = nil
  113. rows = nil
  114. error = nil
  115. cached_at = nil
  116. just_cached = false
  117. cache_key = self.cache_key(statement) if cache
  118. if cache && !options[:refresh_cache]
  119. value = Blazer.cache.read(cache_key)
  120. columns, rows, cached_at = Marshal.load(value) if value
  121. end
  122. unless rows
  123. comment = "blazer"
  124. if options[:user].respond_to?(:id)
  125. comment << ",user_id:#{options[:user].id}"
  126. end
  127. if options[:user].respond_to?(Blazer.user_name)
  128. # only include letters, numbers, and spaces to prevent injection
  129. comment << ",user_name:#{options[:user].send(Blazer.user_name).to_s.gsub(/[^a-zA-Z0-9 ]/, "")}"
  130. end
  131. if options[:query].respond_to?(:id)
  132. comment << ",query_id:#{options[:query].id}"
  133. end
  134. columns, rows, error, just_cached = run_statement_helper(statement, comment)
  135. end
  136. output = [columns, rows, error, cached_at]
  137. output << just_cached if options[:with_just_cached]
  138. output
  139. end
  140. def clear_cache(statement)
  141. Blazer.cache.delete(cache_key(statement))
  142. end
  143. def cache_key(statement)
  144. ["blazer", "v3", id, Digest::MD5.hexdigest(statement)].join("/")
  145. end
  146. def schemas
  147. default_schema = (postgresql? || redshift?) ? "public" : connection_model.connection_config[:database]
  148. settings["schemas"] || [connection_model.connection_config[:schema] || default_schema]
  149. end
  150. def tables
  151. columns, rows, error, cached_at = run_statement(connection_model.send(:sanitize_sql_array, ["SELECT table_name FROM information_schema.tables WHERE table_schema IN (?) ORDER BY table_name", schemas]))
  152. rows.map(&:first)
  153. end
  154. def postgresql?
  155. ["PostgreSQL", "PostGIS"].include?(adapter_name)
  156. end
  157. def redshift?
  158. ["Redshift"].include?(adapter_name)
  159. end
  160. def mysql?
  161. ["MySQL", "Mysql2", "Mysql2Spatial"].include?(adapter_name)
  162. end
  163. def reconnect
  164. connection_model.establish_connection(settings["url"])
  165. end
  166. protected
  167. def run_statement_helper(statement, comment)
  168. columns = []
  169. rows = []
  170. error = nil
  171. start_time = Time.now
  172. result = nil
  173. begin
  174. in_transaction do
  175. if timeout
  176. if postgresql? || redshift?
  177. connection_model.connection.execute("SET statement_timeout = #{timeout.to_i * 1000}")
  178. elsif mysql?
  179. connection_model.connection.execute("SET max_execution_time = #{timeout.to_i * 1000}")
  180. else
  181. raise Blazer::TimeoutNotSupported, "Timeout not supported for #{adapter_name} adapter"
  182. end
  183. end
  184. result = connection_model.connection.select_all("#{statement} /*#{comment}*/")
  185. end
  186. rescue ActiveRecord::StatementInvalid => e
  187. error = e.message.sub(/.+ERROR: /, "")
  188. error = Blazer::TIMEOUT_MESSAGE if Blazer::TIMEOUT_ERRORS.any? { |e| error.include?(e) }
  189. end
  190. duration = Time.now - start_time
  191. if result
  192. columns = result.columns
  193. cast_method = Rails::VERSION::MAJOR < 5 ? :type_cast : :cast_value
  194. result.rows.each do |untyped_row|
  195. rows << (result.column_types.empty? ? untyped_row : columns.each_with_index.map { |c, i| untyped_row[i] ? result.column_types[c].send(cast_method, untyped_row[i]) : nil })
  196. end
  197. end
  198. cache_data = nil
  199. if !error && (cache_mode == "all" || (cache_mode == "slow" && duration >= cache_slow_threshold))
  200. cache_data = Marshal.dump([columns, rows, Time.now]) rescue nil
  201. if cache_data
  202. Blazer.cache.write(cache_key(statement), cache_data, expires_in: cache_expires_in.to_f * 60)
  203. end
  204. end
  205. [columns, rows, error, !cache_data.nil?]
  206. end
  207. def adapter_name
  208. connection_model.connection.adapter_name
  209. end
  210. def in_transaction
  211. if use_transaction?
  212. connection_model.transaction do
  213. yield
  214. raise ActiveRecord::Rollback
  215. end
  216. else
  217. yield
  218. end
  219. end
  220. end
  221. end