Commit fabc359e authored by Shinya Maeda's avatar Shinya Maeda

Multithreading cluster creation is done with `reactive_cache`

parent 058e5957
class Projects::ClustersController < Projects::ApplicationController class Projects::ClustersController < Projects::ApplicationController
before_action :cluster before_action :cluster, except: [:login, :index, :new, :create]
before_action :authorize_google_api, except: [:login] before_action :authorize_google_api, except: [:login]
# before_action :authorize_admin_clusters! # TODO: Authentication # before_action :authorize_admin_clusters! # TODO: Authentication
...@@ -11,8 +11,8 @@ class Projects::ClustersController < Projects::ApplicationController ...@@ -11,8 +11,8 @@ class Projects::ClustersController < Projects::ApplicationController
end end
def index def index
if cluster if project.clusters.any?
redirect_to edit_namespace_project_cluster_path(project.namespace, project, cluster.id) redirect_to edit_namespace_project_cluster_path(project.namespace, project, project.clusters.last.id)
else else
redirect_to action: 'new' redirect_to action: 'new'
end end
...@@ -22,72 +22,36 @@ class Projects::ClustersController < Projects::ApplicationController ...@@ -22,72 +22,36 @@ class Projects::ClustersController < Projects::ApplicationController
end end
def create def create
# Create a cluster on GKE begin
operation = api_client.projects_zones_clusters_create( Ci::CreateClusterService.new(project, current_user, params)
params['gcp_project_id'], params['cluster_zone'], params['cluster_name'], .create_cluster_on_gke(api_client)
cluster_size: params['cluster_size'], machine_type: params['machine_type'] rescue Ci::CreateClusterService::UnexpectedOperationError => e
) puts "#{self.class.name} - #{__callee__}: e: #{e}"
# TODO: error
# wait_operation_done
if operation&.operation_type == 'CREATE_CLUSTER'
api_client.wait_operation_done(operation.self_link)
else
raise "TODO: ERROR"
end end
# Get cluster details (end point, etc) redirect_to action: 'index'
gke_cluster = api_client.projects_zones_clusters_get( end
params['gcp_project_id'], params['cluster_zone'], params['cluster_name']
)
# Get k8s token ##
token = '' # Return
KubernetesService.new.tap do |ks| # @status: The current status of the operation.
ks.api_url = 'https://' + gke_cluster.endpoint # @status_message: If an error has occurred, a textual description of the error.
ks.ca_pem = Base64.decode64(gke_cluster.master_auth.cluster_ca_certificate) def creation_status
ks.username = gke_cluster.master_auth.username respond_to do |format|
ks.password = gke_cluster.master_auth.password format.json do
secrets = ks.read_secrets render json: cluster.creation_status(session[GoogleApi::CloudPlatform::Client.token_in_session])
secrets.each do |secret|
name = secret.dig('metadata', 'name')
if /default-token/ =~ name
token_base64 = secret.dig('data', 'token')
token = Base64.decode64(token_base64)
break
end end
end end
end end
# Update service
kubernetes_service.attributes = service_params(
active: true,
api_url: 'https://' + gke_cluster.endpoint,
ca_pem: Base64.decode64(gke_cluster.master_auth.cluster_ca_certificate),
namespace: params['project_namespace'],
token: token
)
kubernetes_service.save!
# Save info
project.clusters.create(
gcp_project_id: params['gcp_project_id'],
cluster_zone: params['cluster_zone'],
cluster_name: params['cluster_name'],
service: kubernetes_service
)
redirect_to action: 'index'
end
def edit def edit
# TODO: If on, do we override parameter?
# TODO: If off, do we override parameter?
end end
def update def update
cluster.update(enabled: params['enabled']) cluster.update(enabled: params['enabled'])
cluster.service.update(active: params['enabled']) cluster.service.update(active: params['enabled'])
# TODO: Do we overwrite KubernetesService parameter?
render :edit render :edit
end end
...@@ -99,8 +63,7 @@ class Projects::ClustersController < Projects::ApplicationController ...@@ -99,8 +63,7 @@ class Projects::ClustersController < Projects::ApplicationController
private private
def cluster def cluster
# Each project has only one cluster, for now. In the future iteraiton, we'll support multiple clusters @cluster ||= project.clusters.find(params[:id])
@cluster ||= project.clusters.last
end end
def api_client def api_client
...@@ -112,20 +75,6 @@ class Projects::ClustersController < Projects::ApplicationController ...@@ -112,20 +75,6 @@ class Projects::ClustersController < Projects::ApplicationController
) )
end end
def kubernetes_service
@kubernetes_service ||= project.find_or_initialize_service('kubernetes')
end
def service_params(active:, api_url:, ca_pem:, namespace:, token:)
{
active: active,
api_url: api_url,
ca_pem: ca_pem,
namespace: namespace,
token: token
}
end
def authorize_google_api def authorize_google_api
unless session[GoogleApi::CloudPlatform::Client.token_in_session] unless session[GoogleApi::CloudPlatform::Client.token_in_session]
redirect_to action: 'login' redirect_to action: 'login'
......
module Ci module Ci
class Cluster < ActiveRecord::Base class Cluster < ActiveRecord::Base
extend Gitlab::Ci::Model extend Gitlab::Ci::Model
include ReactiveCaching
self.reactive_cache_key = ->(cluster) { [cluster.class.model_name.singular, cluster.project_id, cluster.id] }
belongs_to :project belongs_to :project
belongs_to :owner, class_name: 'User' belongs_to :owner, class_name: 'User'
belongs_to :service belongs_to :service
# after_save :clear_reactive_cache!
def creation_status(access_token)
with_reactive_cache(access_token) do |operation|
{
status: operation[:status],
status_message: operation[:status_message]
}
end
end
def calculate_reactive_cache(access_token)
return { status: 'INTEGRATED' } if service # If it's already done, we don't need to continue the following process
api_client = GoogleApi::CloudPlatform::Client.new(access_token, nil)
operation = api_client.projects_zones_operations(gcp_project_id, cluster_zone, gcp_operation_id)
if operation&.status == 'DONE'
# Get cluster details (end point, etc)
gke_cluster = api_client.projects_zones_clusters_get(
gcp_project_id, cluster_zone, cluster_name
)
# Get k8s token
token = ''
KubernetesService.new.tap do |ks|
ks.api_url = 'https://' + gke_cluster.endpoint
ks.ca_pem = Base64.decode64(gke_cluster.master_auth.cluster_ca_certificate)
ks.username = gke_cluster.master_auth.username
ks.password = gke_cluster.master_auth.password
secrets = ks.read_secrets
secrets.each do |secret|
name = secret.dig('metadata', 'name')
if /default-token/ =~ name
token_base64 = secret.dig('data', 'token')
token = Base64.decode64(token_base64)
break
end
end
end
# k8s endpoint, ca_cert
endpoint = 'https://' + gke_cluster.endpoint
cluster_ca_certificate = Base64.decode64(gke_cluster.master_auth.cluster_ca_certificate)
# Update service
kubernetes_service.attributes = {
active: true,
api_url: endpoint,
ca_pem: cluster_ca_certificate,
namespace: project_namespace,
token: token
}
kubernetes_service.save!
# Save info in cluster record
update(
enabled: true,
service: kubernetes_service,
username: gke_cluster.master_auth.username,
password: gke_cluster.master_auth.password,
token: token,
ca_cert: cluster_ca_certificate,
end_point: endpoint,
)
end
puts "#{self.class.name} - #{__callee__}: operation.to_json: #{operation.to_json}"
operation.to_h
end
def kubernetes_service
@kubernetes_service ||= project.find_or_initialize_service('kubernetes')
end
end end
end end
module Ci
class CreateClusterService < BaseService
UnexpectedOperationError = Class.new(StandardError)
def create_cluster_on_gke(api_client)
# Create a cluster on GKE
operation = api_client.projects_zones_clusters_create(
params['gcp_project_id'], params['cluster_zone'], params['cluster_name'],
cluster_size: params['cluster_size'], machine_type: params['machine_type']
)
if operation&.status != ('RUNNING' || 'PENDING')
raise UnexpectedOperationError
end
api_client.parse_self_link(operation.self_link).tap do |project_id, zone, operation_id|
project.clusters.create(owner: current_user,
gcp_project_id: params['gcp_project_id'],
cluster_zone: params['cluster_zone'],
cluster_name: params['cluster_name'],
project_namespace: params['project_namespace'],
gcp_operation_id: operation_id).tap do |cluster|
# Start status polling. When the operation finish, create KubernetesService.
cluster.creation_status(api_client.access_token)
end
end
end
end
end
Create a new cluster Create a new cluster
%br %br
= link_to "Create on Google Container Engine", namespace_project_clusters_path(@project.namespace, @project, cluster_name: "gke-test-creation42", gcp_project_id: 'gitlab-internal-153318', cluster_zone: 'us-central1-a', cluster_size: '1', project_namespace: 'aaa', machine_type: '???'), method: :post = link_to "Create on Google Container Engine", namespace_project_clusters_path(@project.namespace, @project, cluster_name: "gke-test-creation#{Random.rand(100)}", gcp_project_id: 'gitlab-internal-153318', cluster_zone: 'us-central1-a', cluster_size: '1', project_namespace: 'aaa', machine_type: '???'), method: :post
-# gke-test-creation#{Random.rand(100)} -# gke-test-creation#{Random.rand(100)}
...@@ -5,3 +5,4 @@ edit/show cluster ...@@ -5,3 +5,4 @@ edit/show cluster
= link_to "Enable", namespace_project_cluster_path(@project.namespace, @project, @cluster.id, enabled: 'true'), method: :put = link_to "Enable", namespace_project_cluster_path(@project.namespace, @project, @cluster.id, enabled: 'true'), method: :put
= link_to "Disable", namespace_project_cluster_path(@project.namespace, @project, @cluster.id, enabled: 'false'), method: :put = link_to "Disable", namespace_project_cluster_path(@project.namespace, @project, @cluster.id, enabled: 'false'), method: :put
= link_to "Soft-delete the cluster", namespace_project_cluster_path(@project.namespace, @project, @cluster.id), method: :delete = link_to "Soft-delete the cluster", namespace_project_cluster_path(@project.namespace, @project, @cluster.id), method: :delete
= link_to 'Check status', creation_status_namespace_project_cluster_path(@cluster.project.namespace, @cluster.project, @cluster.id), :remote => true
...@@ -187,6 +187,10 @@ constraints(ProjectUrlConstrainer.new) do ...@@ -187,6 +187,10 @@ constraints(ProjectUrlConstrainer.new) do
collection do collection do
get :login get :login
end end
member do
get :creation_status, format: :json
end
end end
resources :environments, except: [:destroy] do resources :environments, except: [:destroy] do
......
...@@ -24,6 +24,7 @@ class CreateCiClusters < ActiveRecord::Migration ...@@ -24,6 +24,7 @@ class CreateCiClusters < ActiveRecord::Migration
t.string :gcp_project_id t.string :gcp_project_id
t.string :cluster_zone t.string :cluster_zone
t.string :cluster_name t.string :cluster_name
t.string :gcp_operation_id
t.datetime_with_timezone :created_at, null: false t.datetime_with_timezone :created_at, null: false
t.datetime_with_timezone :updated_at, null: false t.datetime_with_timezone :updated_at, null: false
......
...@@ -281,6 +281,7 @@ ActiveRecord::Schema.define(version: 20170924094327) do ...@@ -281,6 +281,7 @@ ActiveRecord::Schema.define(version: 20170924094327) do
t.string "gcp_project_id" t.string "gcp_project_id"
t.string "cluster_zone" t.string "cluster_zone"
t.string "cluster_name" t.string "cluster_name"
t.string "gcp_operation_id"
t.datetime "created_at", null: false t.datetime "created_at", null: false
t.datetime "updated_at", null: false t.datetime "updated_at", null: false
end end
......
...@@ -40,7 +40,8 @@ module GoogleApi ...@@ -40,7 +40,8 @@ module GoogleApi
begin begin
operation = service.create_cluster(project_id, zone, request_body) operation = service.create_cluster(project_id, zone, request_body)
rescue Google::Apis::ClientError, Google::Apis::AuthorizationError => e rescue Google::Apis::ClientError, Google::Apis::AuthorizationError => e
Rails.logger.error("#{self.class.name}: Could not create cluster #{cluster_name}: #{e}") puts "#{self.class.name} - #{__callee__}: Could not create cluster #{cluster_name}: #{e}"
# TODO: Error
end end
puts "#{self.class.name} - #{__callee__}: operation: #{operation.inspect}" puts "#{self.class.name} - #{__callee__}: operation: #{operation.inspect}"
operation operation
...@@ -51,23 +52,14 @@ module GoogleApi ...@@ -51,23 +52,14 @@ module GoogleApi
service.authorization = access_token service.authorization = access_token
operation = service.get_zone_operation(project_id, zone, operation_id) operation = service.get_zone_operation(project_id, zone, operation_id)
puts "#{self.class.name} - #{__callee__}: operation: #{operation.inspect}"
operation operation
end end
def wait_operation_done(self_link) def parse_self_link(self_link)
running = true
ret = self_link.match(/projects\/(.*)\/zones\/(.*)\/operations\/(.*)/) ret = self_link.match(/projects\/(.*)\/zones\/(.*)\/operations\/(.*)/)
project_id = ret[1]
zone = ret[2]
operation_id = ret[3]
while running return ret[1], ret[2], ret[3] # project_id, zone, operation_id
operation = projects_zones_operations(project_id, zone, operation_id)
if operation.status != 'RUNNING'
running = false
end
end
end end
end end
end end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment