druid_adapter.rb 2.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
  1. module Blazer
  2. module Adapters
  3. class DruidAdapter < BaseAdapter
  4. TIMESTAMP_REGEX = /\A\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z\z/
  5. def run_statement(statement, comment)
  6. columns = []
  7. rows = []
  8. error = nil
  9. header = {"Content-Type" => "application/json", "Accept" => "application/json"}
  10. timeout = data_source.timeout ? data_source.timeout.to_i : 300
  11. data = {
  12. query: statement,
  13. context: {
  14. timeout: timeout * 1000
  15. }
  16. }
  17. uri = URI.parse("#{settings["url"]}/druid/v2/sql/")
  18. http = Net::HTTP.new(uri.host, uri.port)
  19. http.read_timeout = timeout
  20. begin
  21. response = JSON.parse(http.post(uri.request_uri, data.to_json, header).body)
  22. if response.is_a?(Hash)
  23. error = response["errorMessage"] || "Unknown error: #{response.inspect}"
  24. if error.include?("timed out")
  25. error = Blazer::TIMEOUT_MESSAGE
  26. end
  27. else
  28. columns = (response.first || {}).keys
  29. rows = response.map { |r| r.values }
  30. # Druid doesn't return column types
  31. # and no timestamp type in JSON
  32. rows.each do |row|
  33. row.each_with_index do |v, i|
  34. if v.is_a?(String) && TIMESTAMP_REGEX.match(v)
  35. row[i] = Time.parse(v)
  36. end
  37. end
  38. end
  39. end
  40. rescue => e
  41. error = e.message
  42. end
  43. [columns, rows, error]
  44. end
  45. def tables
  46. result = data_source.run_statement("SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA NOT IN ('INFORMATION_SCHEMA') ORDER BY TABLE_NAME")
  47. result.rows.map(&:first)
  48. end
  49. def schema
  50. result = data_source.run_statement("SELECT TABLE_SCHEMA, TABLE_NAME, COLUMN_NAME, DATA_TYPE, ORDINAL_POSITION FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA NOT IN ('INFORMATION_SCHEMA') ORDER BY 1, 2")
  51. result.rows.group_by { |r| [r[0], r[1]] }.map { |k, vs| {schema: k[0], table: k[1], columns: vs.sort_by { |v| v[2] }.map { |v| {name: v[2], data_type: v[3]} }} }
  52. end
  53. def preview_statement
  54. "SELECT * FROM {table} LIMIT 10"
  55. end
  56. end
  57. end
  58. end