Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 87 additions & 19 deletions app/models/loader/orchestrate.rb
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
# frozen_string_literal: true

# Loads a policy into the database, by operating on a PolicyVersion which has already been created with the policy id,
require 'conjur/extension_repository'

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

File orchestrate.rb has 300 lines of code (exceeds 250 allowed). Consider refactoring.

require 'conjur/extensions/policy_load'

# Loads a policy into the database, by operating on a PolicyVersion which has already been created with the policy id,
# policy text, the authenticated user, and the policy owner. The PolicyVersion also parses the policy
# and checks it for syntax errors, before this code is invoked.
#
# The algorithm works by loading the policy into a new, temporary schema (schemas are lightweight namespaces
# in Postgres). Then this "new" policy (in the temporary schema) is merged into the "old" policy (in the
# in Postgres). Then this "new" policy (in the temporary schema) is merged into the "old" policy (in the
# primary schema). The merge algorithm proceeds in distinct phases:
#
# 1) Records which exist in the "old" policy but not in the "new" policy are deleted from the "old" policy.
Expand Down Expand Up @@ -35,7 +38,7 @@
# All steps occur within a transaction, so that if any errors occur (e.g. a role or permission grant which references
# a non-existent role or resource), the entire operation is rolled back.
#
# Future: Note that it is also possible to skip step (1) (deletion of records from the "old" policy which are not defined in the
# Future: Note that it is also possible to skip step (1) (deletion of records from the "old" policy which are not defined in the
# "new"). This "safe" mode can be operationally important, because the presence of cascading foreign key constraints in the schema
# means that many records can potentially be deleted as a consequence of deleting an important "root"-ish record. For
# example, deleting the "admin" role will most likely cascade to delete all records in the database.
Expand All @@ -61,9 +64,13 @@ class Orchestrate
annotations: [ :resource_id, :name, :value ]
}

def initialize policy_version
def initialize(
policy_version,
extension_repository: Conjur::ExtensionRepository.new
)
@policy_version = policy_version
@schemata = Schemata.new
@extensions = extension_repository.get(Conjur::Extensions::PolicyLoad)

# Transform each statement into a Loader type
@create_records = policy_version.create_records.map do |policy_object|
Expand All @@ -73,13 +80,16 @@ def initialize policy_version
Loader::Types.wrap(policy_object, self)
end
end

# Gets the id of the policy being loaded.
def policy_id
policy_version.policy.id
end

def setup_db_for_new_policy
# We use the db setup as a proxy for before a policy load
@extensions.call(:before_load_policy, policy_version: @policy_version)

perform_deletion

create_schema
Expand Down Expand Up @@ -149,6 +159,12 @@ def table_data account, schema = ""
# Matching rows are selected by primary keys only, using a LEFT JOIN between the
# existing policy and the new policy.
def delete_removed
@extensions.call(
:before_delete,
policy_version: @policy_version,
schema_name: schema_name
)

TABLES.each do |table|
columns = Array(model_for_table(table).primary_key) + [ :policy_id ]

Expand All @@ -171,6 +187,12 @@ def comparisons table, columns, existing_alias, new_alias
WHERE #{comparisons(table, columns, "#{primary_schema}.", 'deleted_from_')}
DELETE
end

@extensions.call(
:after_delete,
policy_version: @policy_version,
schema_name: schema_name
)
end

# Delete rows from the new policy which are already present in another policy.
Expand Down Expand Up @@ -216,6 +238,12 @@ def eliminate_duplicates table, columns
end

def update_changed
@extensions.call(
:before_update,
policy_version: @policy_version,
schema_name: schema_name
)

in_primary_schema do
TABLES.each do |table|
pk_columns = Array(Sequel::Model(table).primary_key)
Expand All @@ -238,23 +266,41 @@ def update_changed
UPDATE
end
end

@extensions.call(
:after_update,
policy_version: @policy_version,
schema_name: schema_name
)
end

# Copy all remaining records in the new schema into the master schema.
def insert_new
@extensions.call(
:before_insert,
policy_version: @policy_version,
schema_name: schema_name
)

@new_roles = ::Role.all

in_primary_schema do
disable_policy_log_trigger
TABLES.each { |table| insert_table_records(table) }
enable_policy_log_trigger
end

@extensions.call(
:after_insert,
policy_version: @policy_version,
schema_name: schema_name
)
end

def insert_table_records(table)
columns = (TABLE_EQUIVALENCE_COLUMNS[table] + [ :policy_id ]).join(", ")
columns = (TABLE_EQUIVALENCE_COLUMNS[table] + [ :policy_id ]).join(", ")
db.run("INSERT INTO #{table} ( #{columns} ) SELECT #{columns} FROM #{schema_name}.#{table}")

# For large policies, the policy logging triggers occupy the majority
# of the policy load time. To make this more efficient on the initial
# load, we disable the triggers and update the policy log in bulk.
Expand All @@ -263,7 +309,7 @@ def insert_table_records(table)

def disable_policy_log_trigger
# To disable the triggers during the bulk load we use a local
# configuration setting that the trigger function is aware of.
# configuration setting that the trigger function is aware of.
# When we set this variable to `true`, then the trigger will
# observe the setting value and skip its own policy log.
db.run('SET LOCAL conjur.skip_insert_policy_log_trigger = true')
Expand All @@ -277,10 +323,10 @@ def insert_policy_log_records(table)
primary_key_columns = Array(Sequel::Model(table).primary_key).map(&:to_s).pg_array
db.run(<<-POLICY_LOG)
INSERT INTO policy_log(
policy_id,
policy_id,
version,
operation,
kind,
operation,
kind,
subject)
SELECT
(policy_log_record(
Expand All @@ -305,12 +351,24 @@ def schema_name

# Perform explicitly requested deletions
def perform_deletion
@extensions.call(
:before_delete,
policy_version: @policy_version,
schema_name: schema_name
)

delete_records.map(&:delete!)

@extensions.call(
:after_delete,
policy_version: @policy_version,
schema_name: schema_name
)
end

# Loads the records into the temporary schema (since the schema search path contains only the temporary schema).
#
#
#
def load_records
raise "Policy version must be saved before loading" unless policy_version.resource_id

Expand Down Expand Up @@ -351,8 +409,8 @@ def create_schema
CREATE OR REPLACE FUNCTION account(id text) RETURNS text
LANGUAGE sql IMMUTABLE
AS $$
SELECT CASE
WHEN split_part($1, ':', 1) = '' THEN NULL
SELECT CASE
WHEN split_part($1, ':', 1) = '' THEN NULL
ELSE split_part($1, ':', 1)
END
$$;
Expand All @@ -362,6 +420,13 @@ def create_schema
db.execute("ALTER TABLE roles ADD PRIMARY KEY ( role_id )")

db.execute("ALTER TABLE role_memberships ALTER COLUMN admin_option SET DEFAULT 'f'")

# We use the db setup as a proxy for before a policy load
@extensions.call(
:after_create_schema,
policy_version: @policy_version,
schema_name: schema_name
)
end

# Drops the temporary schema and everything remaining in it. Also reset the schema search path.
Expand All @@ -373,10 +438,10 @@ def drop_schema
def db
Sequel::Model.db
end
# PostgreSQL has many types of caches, one of which is the "catalog cache".
# When a connection is established to the database, this cache is initialized alongside it, and persists for the duration of the connection.
# This cache contains references to Database Objects, such as indexes, etc. (not data records themselves).

# PostgreSQL has many types of caches, one of which is the "catalog cache".
# When a connection is established to the database, this cache is initialized alongside it, and persists for the duration of the connection.
# This cache contains references to Database Objects, such as indexes, etc. (not data records themselves).
# This cache is not cleaned up by the system automatically. However, if the connection is disconnected, the cache is dumped.
# further reading: Postgres community email thread: https://www.postgresql.org/message-id/flat/20161219.201505.11562604.horiguchi.kyotaro@lab.ntt.co.jp.
# The default connection pool does not support closing connections.We must be able to close connections on demand
Expand All @@ -385,6 +450,9 @@ def db
# [docs](https://www.rubydoc.info/github/jeremyevans/sequel/Sequel/ShardedThreadedConnectionPool)
def release_db_connection
Sequel::Model.db.disconnect
end

# We use releasing the db connection as a proxy for after a policy load
@extensions.call(:after_load_policy, policy_version: policy_version)
end
end
end
24 changes: 20 additions & 4 deletions config/application.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,15 @@
$LOAD_PATH.push(File.expand_path("../../engines/conjur_audit/lib", __FILE__))
require 'conjur_audit'

# Must require because lib folder hasn't been loaded yet
require './lib/conjur/conjur_config'
# Ensure we can require application source files from the extensions before
# auto-loading is ready.
$LOAD_PATH.push(File.expand_path("../../", __FILE__))
$LOAD_PATH.push(File.expand_path("../../lib", __FILE__))

# Must require because lib folder hasn't been auto-loaded yet
require 'conjur/conjur_config'
require 'conjur/extension_repository'
require 'conjur/extensions/server_lifecycle'

module Conjur
class Application < Rails::Application
Expand All @@ -47,15 +54,15 @@ class Application < Rails::Application
Sequel.extension(:core_extensions, :postgres_schemata)
Sequel::Model.db.extension(:pg_array, :pg_inet)
end

#The default connection pool does not support closing connections.
# We must be able to close connections on demand to clear the connection cache
# after policy loads [cyberark/conjur#2584](https://github.com/cyberark/conjur/pull/2584)
# The [ShardedThreadedConnectionPool](https://www.rubydoc.info/github/jeremyevans/sequel/Sequel/ShardedThreadedConnectionPool) does support closing connections on-demand.
# Sequel is configured to use the ShardedThreadedConnectionPool by setting the servers configuration on
# the database connection [docs](https://www.rubydoc.info/github/jeremyevans/sequel/Sequel%2FShardedThreadedConnectionPool:servers)
config.sequel.servers = {}

config.encoding = "utf-8"
config.active_support.escape_html_entities_in_json = true

Expand All @@ -82,5 +89,14 @@ class Application < Rails::Application
# We create this in application.rb instead of an initializer so that it's
# guaranteed to be available for other initializers to use.
config.conjur_config = Conjur::ConjurConfig.new

# Signal lifeycle extensions that the application server is about to start
extension_repository = Conjur::ExtensionRepository.new(
logger: Logger.new(STDOUT)
)
lifecycle_extensions = extension_repository.get(
Conjur::Extensions::ServerLifecycle
)
lifecycle_extensions.call(:before_server_start)
end
end
Loading