data_source.rb 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
  1. module Blazer
  2. class DataSource
  3. attr_reader :id, :settings, :connection_model
  4. def initialize(id, settings)
  5. @id = id
  6. @settings = settings
  7. @connection_model =
  8. Class.new(Blazer::Connection) do
  9. def self.name
  10. "Blazer::Connection::#{object_id}"
  11. end
  12. establish_connection(settings["url"]) if settings["url"]
  13. end
  14. end
  15. def name
  16. settings["name"] || @id
  17. end
  18. def linked_columns
  19. settings["linked_columns"] || {}
  20. end
  21. def smart_columns
  22. settings["smart_columns"] || {}
  23. end
  24. def smart_variables
  25. settings["smart_variables"] || {}
  26. end
  27. def timeout
  28. settings["timeout"]
  29. end
  30. def run_statement(statement, options = {})
  31. rows = []
  32. error = nil
  33. connection_model.transaction do
  34. begin
  35. connection_model.connection.execute("SET statement_timeout = #{timeout.to_i * 1000}") if timeout && postgresql?
  36. comment = "blazer"
  37. if options[:user].respond_to?(:id)
  38. comment << ",user_id:#{options[:user].id}"
  39. end
  40. if options[:query].respond_to?(:id)
  41. comment << ",query_id:#{options[:query].id}"
  42. end
  43. result = connection_model.connection.select_all("#{statement} /*#{comment}*/")
  44. result.each do |untyped_row|
  45. row = {}
  46. untyped_row.each do |k, v|
  47. row[k] = result.column_types.empty? ? v : result.column_types[k].send(:type_cast, v)
  48. end
  49. rows << row
  50. end
  51. rescue ActiveRecord::StatementInvalid => e
  52. error = e.message.sub(/.+ERROR: /, "")
  53. ensure
  54. raise ActiveRecord::Rollback
  55. end
  56. end
  57. [rows, error]
  58. end
  59. def tables
  60. default_schema = postgresql? ? "public" : connection_model.connection_config[:database]
  61. schema = connection_model.connection_config[:schema] || default_schema
  62. rows, error = run_statement(connection_model.send(:sanitize_sql_array, ["SELECT table_name, column_name, ordinal_position, data_type FROM information_schema.columns WHERE table_schema = ?", schema]))
  63. Hash[rows.group_by { |r| r["table_name"] }.map { |t, f| [t, f.sort_by { |f| f["ordinal_position"] }.map { |f| f.slice("column_name", "data_type") }] }.sort_by { |t, _f| t }]
  64. end
  65. def postgresql?
  66. ["PostgreSQL", "Redshift"].include?(connection_model.connection.adapter_name)
  67. end
  68. end
  69. end