athena_adapter.rb 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. module Blazer
  2. module Adapters
  3. class AthenaAdapter < BaseAdapter
  4. def run_statement(statement, comment)
  5. require "digest/md5"
  6. columns = []
  7. rows = []
  8. error = nil
  9. begin
  10. resp =
  11. client.start_query_execution(
  12. query_string: statement,
  13. # use token so we fetch cached results after query is run
  14. client_request_token: Digest::MD5.hexdigest(statement),
  15. query_execution_context: {
  16. database: database,
  17. },
  18. result_configuration: {
  19. output_location: settings["output_location"]
  20. }
  21. )
  22. query_execution_id = resp.query_execution_id
  23. timeout = data_source.timeout || 300
  24. stop_at = Time.now + timeout
  25. resp = nil
  26. begin
  27. resp = client.get_query_results(
  28. query_execution_id: query_execution_id
  29. )
  30. rescue Aws::Athena::Errors::InvalidRequestException => e
  31. if e.message != "Query has not yet finished. Current state: RUNNING"
  32. raise e
  33. end
  34. if Time.now < stop_at
  35. sleep(3)
  36. retry
  37. end
  38. end
  39. if resp && resp.result_set
  40. column_info = resp.result_set.result_set_metadata.column_info
  41. columns = column_info.map(&:name)
  42. column_types = column_info.map(&:type)
  43. untyped_rows = []
  44. # paginated
  45. resp.each do |page|
  46. untyped_rows.concat page.result_set.rows.map { |r| r.data.map(&:var_char_value) }
  47. end
  48. utc = ActiveSupport::TimeZone['Etc/UTC']
  49. rows = untyped_rows[1..-1] || []
  50. column_types.each_with_index do |ct, i|
  51. # TODO more column_types
  52. case ct
  53. when "timestamp"
  54. rows.each do |row|
  55. row[i] = utc.parse(row[i])
  56. end
  57. when "date"
  58. rows.each do |row|
  59. row[i] = Date.parse(row[i])
  60. end
  61. when "bigint"
  62. rows.each do |row|
  63. row[i] = row[i].to_i
  64. end
  65. when "double"
  66. rows.each do |row|
  67. row[i] = row[i].to_f
  68. end
  69. end
  70. end
  71. elsif resp
  72. error = fetch_error(query_execution_id)
  73. else
  74. error = Blazer::TIMEOUT_MESSAGE
  75. end
  76. rescue Aws::Athena::Errors::InvalidRequestException => e
  77. error = e.message
  78. if error == "Query did not finish successfully. Final query state: FAILED"
  79. error = fetch_error(query_execution_id)
  80. end
  81. end
  82. [columns, rows, error]
  83. end
  84. def tables
  85. glue.get_tables(database_name: database).table_list.map(&:name).sort
  86. end
  87. def schema
  88. glue.get_tables(database_name: database).table_list.map { |t| {table: t.name, columns: t.storage_descriptor.columns.map { |c| {name: c.name, data_type: c.type} }} }
  89. end
  90. def preview_statement
  91. "SELECT * FROM {table} LIMIT 10"
  92. end
  93. private
  94. def database
  95. @database ||= settings["database"] || "default"
  96. end
  97. def fetch_error(query_execution_id)
  98. client.get_query_execution(
  99. query_execution_id: query_execution_id
  100. ).query_execution.status.state_change_reason
  101. end
  102. def client
  103. @client ||= Aws::Athena::Client.new
  104. end
  105. def glue
  106. @glue ||= Aws::Glue::Client.new
  107. end
  108. end
  109. end
  110. end