athena_adapter.rb 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. module Blazer
  2. module Adapters
  3. class AthenaAdapter < BaseAdapter
  4. def run_statement(statement, comment)
  5. require "digest/md5"
  6. columns = []
  7. rows = []
  8. error = nil
  9. begin
  10. resp =
  11. client.start_query_execution(
  12. query_string: statement,
  13. # use token so we fetch cached results after query is run
  14. client_request_token: Digest::MD5.hexdigest(statement),
  15. query_execution_context: {
  16. database: settings["database"] || "default",
  17. },
  18. result_configuration: {
  19. output_location: settings["output_location"]
  20. }
  21. )
  22. query_execution_id = resp.query_execution_id
  23. timeout = data_source.timeout || 300
  24. stop_at = Time.now + timeout
  25. resp = nil
  26. begin
  27. resp = client.get_query_results(
  28. query_execution_id: query_execution_id
  29. )
  30. rescue Aws::Athena::Errors::InvalidRequestException => e
  31. if e.message != "Query has not yet finished. Current state: RUNNING"
  32. raise e
  33. end
  34. if Time.now < stop_at
  35. sleep(3)
  36. retry
  37. end
  38. end
  39. if resp && resp.result_set
  40. column_info = resp.result_set.result_set_metadata.column_info
  41. columns = column_info.map(&:name)
  42. column_types = column_info.map(&:type)
  43. untyped_rows = []
  44. # paginated
  45. resp.each do |page|
  46. untyped_rows.concat page.result_set.rows.map { |r| r.data.map(&:var_char_value) }
  47. end
  48. utc = ActiveSupport::TimeZone['Etc/UTC']
  49. rows = untyped_rows[1..-1] || []
  50. column_types.each_with_index do |ct, i|
  51. # TODO more column_types
  52. case ct
  53. when "timestamp"
  54. rows.each do |row|
  55. row[i] = utc.parse(row[i])
  56. end
  57. when "date"
  58. rows.each do |row|
  59. row[i] = Date.parse(row[i])
  60. end
  61. when "bigint"
  62. rows.each do |row|
  63. row[i] = row[i].to_i
  64. end
  65. when "double"
  66. rows.each do |row|
  67. row[i] = row[i].to_f
  68. end
  69. end
  70. end
  71. elsif resp
  72. # failed, get message
  73. resp2 = client.get_query_execution(
  74. query_execution_id: query_execution_id
  75. )
  76. error = resp2.query_execution.status.state_change_reason
  77. else
  78. error = Blazer::TIMEOUT_MESSAGE
  79. end
  80. rescue Aws::Athena::Errors::InvalidRequestException => e
  81. error = e.message
  82. end
  83. [columns, rows, error]
  84. end
  85. private
  86. def client
  87. @client ||= Aws::Athena::Client.new
  88. end
  89. end
  90. end
  91. end