data_source.rb 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188
  1. require "digest/md5"
  2. module Blazer
  3. class DataSource
  4. extend Forwardable
  5. attr_reader :id, :settings, :adapter, :adapter_instance
  6. def_delegators :adapter_instance, :schema, :tables, :preview_statement, :reconnect, :cost, :explain
  7. def initialize(id, settings)
  8. @id = id
  9. @settings = settings
  10. unless settings["url"] || Rails.env.development?
  11. raise Blazer::Error, "Empty url"
  12. end
  13. @adapter_instance =
  14. case adapter
  15. when "sql"
  16. Blazer::Adapters::SqlAdapter.new(self)
  17. when "elasticsearch"
  18. Blazer::Adapters::ElasticsearchAdapter.new(self)
  19. when "mongodb"
  20. Blazer::Adapters::MongodbAdapter.new(self)
  21. else
  22. raise Blazer::Error, "Unknown adapter"
  23. end
  24. end
  25. def adapter
  26. settings["adapter"] || detect_adapter
  27. end
  28. def name
  29. settings["name"] || @id
  30. end
  31. def linked_columns
  32. settings["linked_columns"] || {}
  33. end
  34. def smart_columns
  35. settings["smart_columns"] || {}
  36. end
  37. def smart_variables
  38. settings["smart_variables"] || {}
  39. end
  40. def variable_defaults
  41. settings["variable_defaults"] || {}
  42. end
  43. def timeout
  44. settings["timeout"]
  45. end
  46. def cache
  47. @cache ||= begin
  48. if settings["cache"].is_a?(Hash)
  49. settings["cache"]
  50. elsif settings["cache"]
  51. {
  52. "mode" => "all",
  53. "expires_in" => settings["cache"]
  54. }
  55. else
  56. {
  57. "mode" => "off"
  58. }
  59. end
  60. end
  61. end
  62. def cache_mode
  63. cache["mode"]
  64. end
  65. def cache_expires_in
  66. (cache["expires_in"] || 60).to_f
  67. end
  68. def cache_slow_threshold
  69. (cache["slow_threshold"] || 15).to_f
  70. end
  71. def local_time_suffix
  72. @local_time_suffix ||= Array(settings["local_time_suffix"])
  73. end
  74. def read_cache(cache_key)
  75. value = Blazer.cache.read(cache_key)
  76. if value
  77. Blazer::Result.new(self, *Marshal.load(value), nil)
  78. end
  79. end
  80. def run_results(run_id)
  81. read_cache(run_cache_key(run_id))
  82. end
  83. def delete_results(run_id)
  84. Blazer.cache.delete(run_cache_key(run_id))
  85. end
  86. def run_statement(statement, options = {})
  87. run_id = options[:run_id]
  88. result = nil
  89. if cache_mode != "off" && !options[:refresh_cache]
  90. result = read_cache(statement_cache_key(statement))
  91. end
  92. unless result
  93. comment = "blazer"
  94. if options[:user].respond_to?(:id)
  95. comment << ",user_id:#{options[:user].id}"
  96. end
  97. if options[:user].respond_to?(Blazer.user_name)
  98. # only include letters, numbers, and spaces to prevent injection
  99. comment << ",user_name:#{options[:user].send(Blazer.user_name).to_s.gsub(/[^a-zA-Z0-9 ]/, "")}"
  100. end
  101. if options[:query].respond_to?(:id)
  102. comment << ",query_id:#{options[:query].id}"
  103. end
  104. if options[:check]
  105. comment << ",check_id:#{options[:check].id},check_emails:#{options[:check].emails}"
  106. end
  107. result = run_statement_helper(statement, comment, options[:run_id])
  108. end
  109. result
  110. end
  111. def clear_cache(statement)
  112. Blazer.cache.delete(statement_cache_key(statement))
  113. end
  114. def cache_key(key)
  115. (["blazer", "v4"] + key).join("/")
  116. end
  117. def statement_cache_key(statement)
  118. cache_key(["statement", id, Digest::MD5.hexdigest(statement)])
  119. end
  120. def run_cache_key(run_id)
  121. cache_key(["run", run_id])
  122. end
  123. protected
  124. def run_statement_helper(statement, comment, run_id)
  125. start_time = Time.now
  126. columns, rows, error = @adapter_instance.run_statement(statement, comment)
  127. duration = Time.now - start_time
  128. cache_data = nil
  129. cache = !error && (cache_mode == "all" || (cache_mode == "slow" && duration >= cache_slow_threshold))
  130. if cache || run_id
  131. cache_data = Marshal.dump([columns, rows, error, cache ? Time.now : nil]) rescue nil
  132. end
  133. if cache && cache_data
  134. Blazer.cache.write(statement_cache_key(statement), cache_data, expires_in: cache_expires_in.to_f * 60)
  135. end
  136. if run_id
  137. unless cache_data
  138. error = "Error storing the results of this query :("
  139. cache_data = Marshal.dump([[], [], error, nil])
  140. end
  141. Blazer.cache.write(run_cache_key(run_id), cache_data, expires_in: 30.seconds)
  142. end
  143. Blazer::Result.new(self, columns, rows, error, nil, cache && !cache_data.nil?)
  144. end
  145. def detect_adapter
  146. if settings["url"].to_s.start_with?("mongodb://")
  147. "mongodb"
  148. else
  149. "sql"
  150. end
  151. end
  152. end
  153. end