data_source.rb 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. require "digest/md5"
  2. module Blazer
  3. class DataSource
  4. extend Forwardable
  5. attr_reader :id, :settings, :adapter, :adapter_instance
  6. def_delegators :adapter_instance, :schema, :tables, :preview_statement, :reconnect, :cost, :explain, :cancel
  7. def initialize(id, settings)
  8. @id = id
  9. @settings = settings
  10. unless settings["url"] || Rails.env.development?
  11. raise Blazer::Error, "Empty url"
  12. end
  13. @adapter_instance =
  14. case adapter
  15. when "elasticsearch"
  16. Blazer::Adapters::ElasticsearchAdapter.new(self)
  17. when "mongodb"
  18. Blazer::Adapters::MongodbAdapter.new(self)
  19. when "presto"
  20. Blazer::Adapters::PrestoAdapter.new(self)
  21. when "sql"
  22. Blazer::Adapters::SqlAdapter.new(self)
  23. else
  24. raise Blazer::Error, "Unknown adapter"
  25. end
  26. end
  27. def adapter
  28. settings["adapter"] || detect_adapter
  29. end
  30. def name
  31. settings["name"] || @id
  32. end
  33. def linked_columns
  34. settings["linked_columns"] || {}
  35. end
  36. def smart_columns
  37. settings["smart_columns"] || {}
  38. end
  39. def smart_variables
  40. settings["smart_variables"] || {}
  41. end
  42. def variable_defaults
  43. settings["variable_defaults"] || {}
  44. end
  45. def timeout
  46. settings["timeout"]
  47. end
  48. def cache
  49. @cache ||= begin
  50. if settings["cache"].is_a?(Hash)
  51. settings["cache"]
  52. elsif settings["cache"]
  53. {
  54. "mode" => "all",
  55. "expires_in" => settings["cache"]
  56. }
  57. else
  58. {
  59. "mode" => "off"
  60. }
  61. end
  62. end
  63. end
  64. def cache_mode
  65. cache["mode"]
  66. end
  67. def cache_expires_in
  68. (cache["expires_in"] || 60).to_f
  69. end
  70. def cache_slow_threshold
  71. (cache["slow_threshold"] || 15).to_f
  72. end
  73. def local_time_suffix
  74. @local_time_suffix ||= Array(settings["local_time_suffix"])
  75. end
  76. def read_cache(cache_key)
  77. value = Blazer.cache.read(cache_key)
  78. if value
  79. Blazer::Result.new(self, *Marshal.load(value), nil)
  80. end
  81. end
  82. def run_results(run_id)
  83. read_cache(run_cache_key(run_id))
  84. end
  85. def delete_results(run_id)
  86. Blazer.cache.delete(run_cache_key(run_id))
  87. end
  88. def run_statement(statement, options = {})
  89. run_id = options[:run_id]
  90. async = options[:async]
  91. result = nil
  92. if cache_mode != "off"
  93. if options[:refresh_cache]
  94. clear_cache(statement) # for checks
  95. else
  96. result = read_cache(statement_cache_key(statement))
  97. end
  98. end
  99. unless result
  100. comment = "blazer"
  101. if options[:user].respond_to?(:id)
  102. comment << ",user_id:#{options[:user].id}"
  103. end
  104. if options[:user].respond_to?(Blazer.user_name)
  105. # only include letters, numbers, and spaces to prevent injection
  106. comment << ",user_name:#{options[:user].send(Blazer.user_name).to_s.gsub(/[^a-zA-Z0-9 ]/, "")}"
  107. end
  108. if options[:query].respond_to?(:id)
  109. comment << ",query_id:#{options[:query].id}"
  110. end
  111. if options[:check]
  112. comment << ",check_id:#{options[:check].id},check_emails:#{options[:check].emails}"
  113. end
  114. if options[:run_id]
  115. comment << ",run_id:#{options[:run_id]}"
  116. end
  117. result = run_statement_helper(statement, comment, async ? options[:run_id] : nil)
  118. end
  119. result
  120. end
  121. def clear_cache(statement)
  122. Blazer.cache.delete(statement_cache_key(statement))
  123. end
  124. def cache_key(key)
  125. (["blazer", "v4"] + key).join("/")
  126. end
  127. def statement_cache_key(statement)
  128. cache_key(["statement", id, Digest::MD5.hexdigest(statement.to_s.gsub("\r\n", "\n"))])
  129. end
  130. def run_cache_key(run_id)
  131. cache_key(["run", run_id])
  132. end
  133. protected
  134. def run_statement_helper(statement, comment, run_id)
  135. start_time = Time.now
  136. columns, rows, error = @adapter_instance.run_statement(statement, comment)
  137. duration = Time.now - start_time
  138. cache_data = nil
  139. cache = !error && (cache_mode == "all" || (cache_mode == "slow" && duration >= cache_slow_threshold))
  140. if cache || run_id
  141. cache_data = Marshal.dump([columns, rows, error, cache ? Time.now : nil]) rescue nil
  142. end
  143. if cache && cache_data
  144. Blazer.cache.write(statement_cache_key(statement), cache_data, expires_in: cache_expires_in.to_f * 60)
  145. end
  146. if run_id
  147. unless cache_data
  148. error = "Error storing the results of this query :("
  149. cache_data = Marshal.dump([[], [], error, nil])
  150. end
  151. Blazer.cache.write(run_cache_key(run_id), cache_data, expires_in: 30.seconds)
  152. end
  153. Blazer::Result.new(self, columns, rows, error, nil, cache && !cache_data.nil?)
  154. end
  155. def detect_adapter
  156. schema = settings["url"].to_s.split("://").first
  157. case schema
  158. when "mongodb", "presto"
  159. schema
  160. else
  161. "sql"
  162. end
  163. end
  164. end
  165. end