From 8f1fc119a5d3b020f3ab1810d3a1ef4b5887bd15 Mon Sep 17 00:00:00 2001 From: Jan Maly Date: Fri, 26 Nov 2021 11:06:12 +0100 Subject: [PATCH 01/14] Updated client creation to support SRV. Updated discovery function to list databases for a setup with no roles. --- setup.py | 1 + tap_mongodb/__init__.py | 72 ++++++++++++++++++++++++++--------------- 2 files changed, 47 insertions(+), 26 deletions(-) diff --git a/setup.py b/setup.py index 1e31ae0..e178845 100644 --- a/setup.py +++ b/setup.py @@ -14,6 +14,7 @@ 'pymongo==3.8.0', 'tzlocal==2.0.0', 'terminaltables==3.1.0', + 'dnspython==2.1.0' ], extras_require={ 'dev': [ diff --git a/tap_mongodb/__init__.py b/tap_mongodb/__init__.py index 035dcb6..311ee29 100644 --- a/tap_mongodb/__init__.py +++ b/tap_mongodb/__init__.py @@ -3,27 +3,23 @@ import json import ssl import sys -import time -import pymongo -from bson import timestamp +import pymongo import singer from singer import metadata, metrics, utils import tap_mongodb.sync_strategies.common as common import tap_mongodb.sync_strategies.full_table as full_table -import tap_mongodb.sync_strategies.oplog as oplog import tap_mongodb.sync_strategies.incremental as incremental - +import tap_mongodb.sync_strategies.oplog as oplog LOGGER = singer.get_logger() REQUIRED_CONFIG_KEYS = [ 'host', - 'port', 'user', 'password', - 'database' + 'use_dns_seed_list' ] IGNORE_DBS = ['system', 'local', 'config'] @@ -103,17 +99,20 @@ def get_roles(client, config): roles.append(sub_role) return roles + def get_databases(client, config): roles = get_roles(client, config) LOGGER.info('Roles: %s', roles) - can_read_all = len([r for r in roles if r['role'] in ROLES_WITH_ALL_DB_FIND_PRIVILEGES]) > 0 + can_read_all = any([r['role'] in ROLES_WITH_ALL_DB_FIND_PRIVILEGES for r in roles]) - if can_read_all: + # TODO - adjusted condition to cover the case with no roles + if can_read_all or len(roles) == 0: db_names = [d for d in client.list_database_names() if d not in IGNORE_DBS] else: db_names = [r['db'] for r in roles if r['db'] not in IGNORE_DBS] db_names = list(set(db_names)) # Make sure each db is only in the list once + LOGGER.info('Datbases: %s', db_names) return db_names @@ -179,7 +178,7 @@ def do_discover(client, config): db_name, collection_name) streams.append(produce_collection_schema(collection)) - json.dump({'streams' : streams}, sys.stdout, indent=2) + json.dump({'streams': streams}, sys.stdout, indent=2) def is_stream_selected(stream): @@ -191,7 +190,6 @@ def is_stream_selected(stream): def get_streams_to_sync(streams, state): - # get selected streams selected_streams = [s for s in streams if is_stream_selected(s)] # prioritize streams that have not been processed @@ -214,7 +212,7 @@ def get_streams_to_sync(streams, state): lambda s: s['tap_stream_id'] == currently_syncing, ordered_streams)) non_currently_syncing_streams = list(filter(lambda s: s['tap_stream_id'] - != currently_syncing, + != currently_syncing, ordered_streams)) streams_to_sync = currently_syncing_stream + non_currently_syncing_streams @@ -230,6 +228,7 @@ def write_schema_message(stream): schema=stream['schema'], key_properties=['_id'])) + def load_stream_projection(stream): md_map = metadata.to_map(stream['metadata']) stream_projection = metadata.get(md_map, (), 'tap-mongodb.projection') @@ -246,10 +245,11 @@ def load_stream_projection(stream): if stream_projection and stream_projection.get('_id') == 0: raise common.InvalidProjectionException( "Projection blacklists key property id for collection {}" \ - .format(stream['tap_stream_id'])) + .format(stream['tap_stream_id'])) return stream_projection + def clear_state_on_replication_change(stream, state): md_map = metadata.to_map(stream['metadata']) tap_stream_id = stream['tap_stream_id'] @@ -276,6 +276,7 @@ def clear_state_on_replication_change(stream, state): return state + def sync_stream(client, stream, state): tap_stream_id = stream['tap_stream_id'] @@ -284,7 +285,6 @@ def sync_stream(client, stream, state): common.SCHEMA_COUNT[tap_stream_id] = 0 common.SCHEMA_TIMES[tap_stream_id] = 0 - md_map = metadata.to_map(stream['metadata']) replication_method = metadata.get(md_map, (), 'replication-method') database_name = metadata.get(md_map, (), 'database-name') @@ -358,20 +358,39 @@ def main_impl(): verify_mode = config.get('verify_mode', 'true') == 'true' use_ssl = config.get('ssl') == 'true' - connection_params = {"host": config['host'], - "port": int(config['port']), - "username": config.get('user', None), - "password": config.get('password', None), - "authSource": config['database'], - "ssl": use_ssl, - "replicaset": config.get('replica_set', None), - "readPreference": 'secondaryPreferred'} + # Use DNS Seed List + use_dns_seed_list = config.get('use_dns_seed_list') == 'true' + + # create the connection + if use_dns_seed_list: + connection_params = { + "host": config['host'], + "port": int(config.get('port', 27017)), + "username": config.get('user', None), + "password": config.get('password', None), + "authSource": config.get('database', None), + "ssl": use_ssl, + "replicaset": config.get('replica_set', None), + "readPreference": 'secondaryPreferred' + } - # NB: "ssl_cert_reqs" must ONLY be supplied if `SSL` is true. - if not verify_mode and use_ssl: - connection_params["ssl_cert_reqs"] = ssl.CERT_NONE + # NB: "ssl_cert_reqs" must ONLY be supplied if `SSL` is true. + if not verify_mode and use_ssl: + connection_params["ssl_cert_reqs"] = ssl.CERT_NONE - client = pymongo.MongoClient(**connection_params) + client = pymongo.MongoClient(**connection_params) + else: + # TODO - does not take all parameters into account -> just our case + if 'user' in config: + # resolve user's credentials + if 'password' in config: + credentials = '{}:{}'.format(config['user'], config['password']) + else: + credentials = config['user'] + connection_url = "mongodb+srv://{}@{}".format(credentials, config['host']) + else: + connection_url = "mongodb+srv://{}".format(config['host']) + client = pymongo.MongoClient(connection_url) LOGGER.info('Connected to MongoDB host: %s, version: %s', config['host'], @@ -386,6 +405,7 @@ def main_impl(): state = args.state or {} do_sync(client, args.catalog.to_dict(), state) + def main(): try: main_impl() From 63a559dafefff94faa51ecaa11ca1fa5028a11dd Mon Sep 17 00:00:00 2001 From: Jan Maly Date: Fri, 26 Nov 2021 11:49:54 +0100 Subject: [PATCH 02/14] Adjusted README to describe new parameters. Updated connection configuration and method to get databases so it can run without database parameter. --- README.md | 8 ++++---- tap_mongodb/__init__.py | 38 ++++++++++++++++---------------------- 2 files changed, 20 insertions(+), 26 deletions(-) diff --git a/README.md b/README.md index 088b748..44191d0 100644 --- a/README.md +++ b/README.md @@ -20,17 +20,17 @@ Create json file called `config.json`, with the following contents: { "password": "", "user": "", - "host": "", - "port": "", - "database": "" + "host": "" } ``` The folowing parameters are optional for your config file: | Name | Type | Description | | -----|------|------------ | -| `replica_set` | string | name of replica set | +| `port` | integer | port number (default is 27017) | +| `database` | string | name of the database | |`ssl` | Boolean | can be set to true to connect using ssl | +|`srv` | Boolean | use DNS Seed List connection string - [link](https://docs.mongodb.com/manual/reference/connection-string/) | | `include_schema_in_destination_stream_name` | Boolean | forces the stream names to take the form `_` instead of ``| All of the above attributes are required by the tap to connect to your mongo instance. diff --git a/tap_mongodb/__init__.py b/tap_mongodb/__init__.py index 311ee29..0d73cf1 100644 --- a/tap_mongodb/__init__.py +++ b/tap_mongodb/__init__.py @@ -18,8 +18,7 @@ REQUIRED_CONFIG_KEYS = [ 'host', 'user', - 'password', - 'use_dns_seed_list' + 'password' ] IGNORE_DBS = ['system', 'local', 'config'] @@ -101,18 +100,21 @@ def get_roles(client, config): def get_databases(client, config): - roles = get_roles(client, config) - LOGGER.info('Roles: %s', roles) - - can_read_all = any([r['role'] in ROLES_WITH_ALL_DB_FIND_PRIVILEGES for r in roles]) - - # TODO - adjusted condition to cover the case with no roles - if can_read_all or len(roles) == 0: + if 'database' not in config: + # handle the case when no database is provided - read all databases + LOGGER.info('No Roles loaded') db_names = [d for d in client.list_database_names() if d not in IGNORE_DBS] else: - db_names = [r['db'] for r in roles if r['db'] not in IGNORE_DBS] - db_names = list(set(db_names)) # Make sure each db is only in the list once + # take roles into consideration + roles = get_roles(client, config) + LOGGER.info('Roles: %s', roles) + can_read_all = any([r['role'] in ROLES_WITH_ALL_DB_FIND_PRIVILEGES for r in roles]) + if can_read_all: + db_names = [d for d in client.list_database_names() if d not in IGNORE_DBS] + else: + db_names = [r['db'] for r in roles if r['db'] not in IGNORE_DBS] + db_names = list(set(db_names)) # Make sure each db is only in the list once LOGGER.info('Datbases: %s', db_names) return db_names @@ -359,10 +361,10 @@ def main_impl(): use_ssl = config.get('ssl') == 'true' # Use DNS Seed List - use_dns_seed_list = config.get('use_dns_seed_list') == 'true' + srv = config.get('srv') == 'true' # create the connection - if use_dns_seed_list: + if srv: connection_params = { "host": config['host'], "port": int(config.get('port', 27017)), @@ -381,15 +383,7 @@ def main_impl(): client = pymongo.MongoClient(**connection_params) else: # TODO - does not take all parameters into account -> just our case - if 'user' in config: - # resolve user's credentials - if 'password' in config: - credentials = '{}:{}'.format(config['user'], config['password']) - else: - credentials = config['user'] - connection_url = "mongodb+srv://{}@{}".format(credentials, config['host']) - else: - connection_url = "mongodb+srv://{}".format(config['host']) + connection_url = "mongodb+srv://{}:{}@{}".format(config['user'], config['password'], config['host']) client = pymongo.MongoClient(connection_url) LOGGER.info('Connected to MongoDB host: %s, version: %s', From 8ffc9b020b5f1070fe574e196d29d01f3dd60043 Mon Sep 17 00:00:00 2001 From: Jan Maly Date: Fri, 26 Nov 2021 18:31:32 +0100 Subject: [PATCH 03/14] Adjusted README to describe setup for Meltano --- README.md | 88 ++++++++++++++++++++++++++++++++++++++++++++++++++++++- setup.py | 48 +++++++++++++++--------------- 2 files changed, 111 insertions(+), 25 deletions(-) diff --git a/README.md b/README.md index 44191d0..d89380b 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,7 @@ # tap-mongodb -This is a [Singer](https://singer.io) tap that produces JSON-formatted data following the [Singer spec](https://github.com/singer-io/getting-started/blob/master/SPEC.md) from a MongoDB source. +This is a [Singer](https://singer.io) tap that produces JSON-formatted data following the [Singer spec](https://github.com/singer-io/getting-started/blob/master/SPEC.md) from a MongoDB source. +This plugin extends the original [implementation](https://github.com/singer-io/tap-mongodb) by introducing support for **srv** mode. ## Set up Virtual Environment ``` @@ -133,6 +134,91 @@ The tap will write bookmarks to stdout which can be captured and passed as an op ### Local MongoDB Setup If you haven't yet set up a local mongodb client, follow [these instructions](https://github.com/singer-io/tap-mongodb/blob/master/spikes/local_mongo_setup.md) +## Use as a Meltano plugin +To use this tap in Meltano, the configuration `meltano.yml` file needs to be extended with a definition of a **custom plugin** as follows: + +``` +plugins: + extractors: + - name: tap-mongodb + label: MongoDB + description: General purpose, document-based, distributed database + namespace: tap_mongodb + variants: + - name: strv + repo: https://github.com/strvcom/tap-mongodb + pip_url: git+https://github.com/strvcom/tap-mongodb.git + executable: tap-mongodb + capabilities: + - catalog + - discover + - state + settings_group_validation: + - [ 'host', 'user', 'password'] + settings: + - name: host + label: Host URL + value: localhost + - name: port + kind: integer + value: 27017 + - name: user + - name: password + kind: password + - name: database + label: Database Name + - name: replica_set + - name: ssl + kind: boolean + value: false + value_post_processor: stringify + label: SSL + - name: verify_mode + kind: boolean + value: true + value_post_processor: stringify + description: SSL Verify Mode + - name: srv + kind: boolean + value: true + description: Use DNS Seed List connection string + - name: include_schemas_in_destination_stream_name + kind: boolean + value: false + description: Forces the stream names to take the form `_` instead of `` +``` + +To configure the plugin, there is an [official guide](https://hub.meltano.com/extractors/mongodb.html) to follow. +The official guide & plugin does not support **srv** mode, to configure this plugin to run in this mode, extend the configuration file with the following: + +``` +plugins: + extractors: + - name: tap-mongodb + ... + config: + host: + user: + srv: true +``` + +The passwords should be set as described in the original guide. + +**Note**: An extractor configuration needs to be defined for this plugin to work. The configuration is described in the [official guide](https://meltano.com/docs/getting-started.html#add-an-extractor-to-pull-data-from-a-source). +A simple example of additional configuration is the following extension of configuration file: + +``` +plugins: + extractors: + - name: tap-mongodb + ... + config: + ... + metadata: + '*': + replication-method: FULL_TABLE +``` + --- Copyright © 2019 Stitch diff --git a/setup.py b/setup.py index e178845..74c6cd5 100644 --- a/setup.py +++ b/setup.py @@ -2,31 +2,31 @@ from setuptools import setup -setup(name='tap-mongodb', - version='2.0.1', - description='Singer.io tap for extracting data from MongoDB', - author='Stitch', - url='https://singer.io', - classifiers=['Programming Language :: Python :: 3 :: Only'], - py_modules=['tap_mongodb'], - install_requires=[ - 'singer-python==5.8.0', - 'pymongo==3.8.0', - 'tzlocal==2.0.0', - 'terminaltables==3.1.0', - 'dnspython==2.1.0' - ], - extras_require={ - 'dev': [ - 'pylint', - 'nose', - 'ipdb' - ] - }, - entry_points=''' +setup( + name='tap-mongodb', + version='2.0.1', + description='Singer.io tap for extracting data from MongoDB', + author='Stitch', + url='https://singer.io', + classifiers=['Programming Language :: Python :: 3 :: Only'], + py_modules=['tap_mongodb'], + install_requires=[ + 'singer-python==5.8.0', + 'pymongo==3.8.0', + 'tzlocal==2.0.0', + 'terminaltables==3.1.0', + 'dnspython==2.1.0' # to support srv + ], + extras_require={ + 'dev': [ + 'pylint', + 'nose', + 'ipdb' + ] + }, + entry_points=''' [console_scripts] tap-mongodb=tap_mongodb:main ''', - packages=['tap_mongodb', 'tap_mongodb.sync_strategies'], - + packages=['tap_mongodb', 'tap_mongodb.sync_strategies'], ) From c4f93457ecb3da40f2fddd8231ec7dcc78c422b0 Mon Sep 17 00:00:00 2001 From: niek mereu Date: Wed, 1 Dec 2021 18:36:28 +0100 Subject: [PATCH 04/14] add hack to full table synch --- tap_mongodb/sync_strategies/full_table.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/tap_mongodb/sync_strategies/full_table.py b/tap_mongodb/sync_strategies/full_table.py index 6934757..4dc10df 100644 --- a/tap_mongodb/sync_strategies/full_table.py +++ b/tap_mongodb/sync_strategies/full_table.py @@ -111,10 +111,12 @@ def sync_collection(client, stream, state, projection): schema_build_start_time = time.time() if common.row_to_schema(schema, row): - singer.write_message(singer.SchemaMessage( - stream=common.calculate_destination_stream_name(stream), - schema=schema, - key_properties=['_id'])) + if "schema" not in stream: + + singer.write_message(singer.SchemaMessage( + stream=common.calculate_destination_stream_name(stream), + schema=schema, + key_properties=['_id'])) common.SCHEMA_COUNT[stream['tap_stream_id']] += 1 common.SCHEMA_TIMES[stream['tap_stream_id']] += time.time() - schema_build_start_time From 5c1e8d60b2ace4dc9ffd1938772bd6b1859dd382 Mon Sep 17 00:00:00 2001 From: niek mereu Date: Wed, 1 Dec 2021 19:55:09 +0100 Subject: [PATCH 05/14] change back --- tap_mongodb/sync_strategies/full_table.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tap_mongodb/sync_strategies/full_table.py b/tap_mongodb/sync_strategies/full_table.py index 4dc10df..5a9126c 100644 --- a/tap_mongodb/sync_strategies/full_table.py +++ b/tap_mongodb/sync_strategies/full_table.py @@ -111,9 +111,8 @@ def sync_collection(client, stream, state, projection): schema_build_start_time = time.time() if common.row_to_schema(schema, row): - if "schema" not in stream: - singer.write_message(singer.SchemaMessage( + singer.write_message(singer.SchemaMessage( stream=common.calculate_destination_stream_name(stream), schema=schema, key_properties=['_id'])) From b694e71d16b8ce9df043d07bae3e093b242a2db1 Mon Sep 17 00:00:00 2001 From: niekstrv <88375778+niekstrv@users.noreply.github.com> Date: Fri, 3 Dec 2021 10:13:56 +0100 Subject: [PATCH 06/14] Update setup.py --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 74c6cd5..3b84b5e 100644 --- a/setup.py +++ b/setup.py @@ -12,7 +12,7 @@ py_modules=['tap_mongodb'], install_requires=[ 'singer-python==5.8.0', - 'pymongo==3.8.0', + 'pymongo==3.12.2', 'tzlocal==2.0.0', 'terminaltables==3.1.0', 'dnspython==2.1.0' # to support srv From 33de5a9d19399fbfbd1e695e74078b1f05a41937 Mon Sep 17 00:00:00 2001 From: niek mereu Date: Fri, 3 Dec 2021 14:34:01 +0100 Subject: [PATCH 07/14] update commong.py --- tap_mongodb/sync_strategies/common.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tap_mongodb/sync_strategies/common.py b/tap_mongodb/sync_strategies/common.py index 1eaa18c..68d9522 100644 --- a/tap_mongodb/sync_strategies/common.py +++ b/tap_mongodb/sync_strategies/common.py @@ -289,7 +289,7 @@ def row_to_schema(schema, row): # get pointer to field's anyOf list if not schema.get('properties', {}).get(field): - schema['properties'][field] = {'anyOf': [{}]} + schema['properties'][field] = {'anyOf': [{"type": "null"}]} anyof_schema = schema['properties'][field]['anyOf'] # add value's schema to anyOf list From 94ce851e1d0f1f138d38df1c7667d8707a03dab8 Mon Sep 17 00:00:00 2001 From: niek mereu Date: Fri, 3 Dec 2021 14:56:44 +0100 Subject: [PATCH 08/14] break loop --- tap_mongodb/sync_strategies/common.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tap_mongodb/sync_strategies/common.py b/tap_mongodb/sync_strategies/common.py index 68d9522..dca0afa 100644 --- a/tap_mongodb/sync_strategies/common.py +++ b/tap_mongodb/sync_strategies/common.py @@ -295,6 +295,8 @@ def row_to_schema(schema, row): # add value's schema to anyOf list changed = add_to_any_of(anyof_schema, value) or changed + break + return changed def get_sync_summary(catalog): From 68e5c15cf85bff20df447a3c5e093f9a5181bba2 Mon Sep 17 00:00:00 2001 From: niek mereu Date: Fri, 3 Dec 2021 15:26:36 +0100 Subject: [PATCH 09/14] comment out create schema from row --- tap_mongodb/sync_strategies/full_table.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tap_mongodb/sync_strategies/full_table.py b/tap_mongodb/sync_strategies/full_table.py index 5a9126c..afe5ec6 100644 --- a/tap_mongodb/sync_strategies/full_table.py +++ b/tap_mongodb/sync_strategies/full_table.py @@ -110,13 +110,13 @@ def sync_collection(client, stream, state, projection): rows_saved += 1 schema_build_start_time = time.time() - if common.row_to_schema(schema, row): + # if common.row_to_schema(schema, row): - singer.write_message(singer.SchemaMessage( - stream=common.calculate_destination_stream_name(stream), - schema=schema, - key_properties=['_id'])) - common.SCHEMA_COUNT[stream['tap_stream_id']] += 1 + # singer.write_message(singer.SchemaMessage( + # stream=common.calculate_destination_stream_name(stream), + # schema=schema, + # key_properties=['_id'])) + # common.SCHEMA_COUNT[stream['tap_stream_id']] += 1 common.SCHEMA_TIMES[stream['tap_stream_id']] += time.time() - schema_build_start_time record_message = common.row_to_singer_record(stream, From 88f28da01ea36bbe0d48e2158db3b6e8dab32715 Mon Sep 17 00:00:00 2001 From: niek mereu Date: Fri, 3 Dec 2021 17:04:29 +0100 Subject: [PATCH 10/14] ad hoc fix videos --- tap_mongodb/sync_strategies/full_table.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tap_mongodb/sync_strategies/full_table.py b/tap_mongodb/sync_strategies/full_table.py index afe5ec6..6dcb412 100644 --- a/tap_mongodb/sync_strategies/full_table.py +++ b/tap_mongodb/sync_strategies/full_table.py @@ -27,6 +27,8 @@ def sync_collection(client, stream, state, projection): db = client[database_name] collection = db[stream['stream']] + collection.update_many({}, {'$unset': {'isPublished': ''}}) + collection.update_many({}, {'$unset': {'isDeleted': ''}}) #before writing the table version to state, check if we had one to begin with first_run = singer.get_bookmark(state, stream['tap_stream_id'], 'version') is None From a8810dde169ffbcb588c06e9e1c8bbcb5071991b Mon Sep 17 00:00:00 2001 From: niek mereu Date: Mon, 6 Dec 2021 10:46:56 +0100 Subject: [PATCH 11/14] add parameter for dropping of fields --- tap_mongodb/__init__.py | 14 +++++++++----- tap_mongodb/sync_strategies/full_table.py | 9 ++++++--- 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/tap_mongodb/__init__.py b/tap_mongodb/__init__.py index 0d73cf1..51367f9 100644 --- a/tap_mongodb/__init__.py +++ b/tap_mongodb/__init__.py @@ -279,7 +279,7 @@ def clear_state_on_replication_change(stream, state): return state -def sync_stream(client, stream, state): +def sync_stream(client, stream, state, fields_to_drop): tap_stream_id = stream['tap_stream_id'] common.COUNTS[tap_stream_id] = 0 @@ -328,7 +328,7 @@ def sync_stream(client, stream, state): oplog.sync_collection(client, stream, state, stream_projection) elif replication_method == 'FULL_TABLE': - full_table.sync_collection(client, stream, state, stream_projection) + full_table.sync_collection(client, stream, state, stream_projection, fields_to_drop) elif replication_method == 'INCREMENTAL': incremental.sync_collection(client, stream, state, stream_projection) @@ -342,12 +342,12 @@ def sync_stream(client, stream, state): singer.write_message(singer.StateMessage(value=copy.deepcopy(state))) -def do_sync(client, catalog, state): +def do_sync(client, catalog, state, fields_to_drop): all_streams = catalog['streams'] streams_to_sync = get_streams_to_sync(all_streams, state) for stream in streams_to_sync: - sync_stream(client, stream, state) + sync_stream(client, stream, state, fields_to_drop) LOGGER.info(common.get_sync_summary(catalog)) @@ -363,6 +363,10 @@ def main_impl(): # Use DNS Seed List srv = config.get('srv') == 'true' + # if no dropping fields specified, create empty list + if not 'fields_to_drop' in list(config.keys()): + config['fields_to_drop'] = [] + # create the connection if srv: connection_params = { @@ -397,7 +401,7 @@ def main_impl(): do_discover(client, config) elif args.catalog: state = args.state or {} - do_sync(client, args.catalog.to_dict(), state) + do_sync(client, args.catalog.to_dict(), state, fields_to_drop) def main(): diff --git a/tap_mongodb/sync_strategies/full_table.py b/tap_mongodb/sync_strategies/full_table.py index 6dcb412..885abc3 100644 --- a/tap_mongodb/sync_strategies/full_table.py +++ b/tap_mongodb/sync_strategies/full_table.py @@ -17,8 +17,10 @@ def get_max_id_value(collection): return None + + # pylint: disable=too-many-locals,invalid-name,too-many-statements -def sync_collection(client, stream, state, projection): +def sync_collection(client, stream, state, projection, fields_to_drop): tap_stream_id = stream['tap_stream_id'] LOGGER.info('Starting full table sync for %s', tap_stream_id) @@ -27,8 +29,9 @@ def sync_collection(client, stream, state, projection): db = client[database_name] collection = db[stream['stream']] - collection.update_many({}, {'$unset': {'isPublished': ''}}) - collection.update_many({}, {'$unset': {'isDeleted': ''}}) + + for field in fields_to_drop: + collection.update_many({}, {'$unset': {field: ''}}) #before writing the table version to state, check if we had one to begin with first_run = singer.get_bookmark(state, stream['tap_stream_id'], 'version') is None From edf6321ded5a8859e1e6e5ce38025db21e0517f7 Mon Sep 17 00:00:00 2001 From: niek mereu Date: Mon, 6 Dec 2021 10:54:34 +0100 Subject: [PATCH 12/14] bug in __ini__.py --- tap_mongodb/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tap_mongodb/__init__.py b/tap_mongodb/__init__.py index 51367f9..6f0ae88 100644 --- a/tap_mongodb/__init__.py +++ b/tap_mongodb/__init__.py @@ -401,7 +401,7 @@ def main_impl(): do_discover(client, config) elif args.catalog: state = args.state or {} - do_sync(client, args.catalog.to_dict(), state, fields_to_drop) + do_sync(client, args.catalog.to_dict(), state, config['fields_to_drop']) def main(): From e35571b9e43d34e06daf4d529bdd0dd85d02143b Mon Sep 17 00:00:00 2001 From: niek mereu Date: Mon, 6 Dec 2021 11:01:39 +0100 Subject: [PATCH 13/14] update to incremental strategy --- tap_mongodb/sync_strategies/incremental.py | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/tap_mongodb/sync_strategies/incremental.py b/tap_mongodb/sync_strategies/incremental.py index 4fc533f..237e323 100644 --- a/tap_mongodb/sync_strategies/incremental.py +++ b/tap_mongodb/sync_strategies/incremental.py @@ -26,13 +26,16 @@ def update_bookmark(row, state, tap_stream_id, replication_key_name): replication_key_type) # pylint: disable=too-many-locals, too-many-statements -def sync_collection(client, stream, state, projection): +def sync_collection(client, stream, state, projection, fields_to_drop): tap_stream_id = stream['tap_stream_id'] LOGGER.info('Starting incremental sync for %s', tap_stream_id) stream_metadata = metadata.to_map(stream['metadata']).get(()) collection = client[stream_metadata['database-name']][stream['stream']] + for field in fields_to_drop: + collection.update_many({}, {'$unset': {field: ''}}) + #before writing the table version to state, check if we had one to begin with first_run = singer.get_bookmark(state, stream['tap_stream_id'], 'version') is None @@ -93,12 +96,12 @@ def sync_collection(client, stream, state, projection): for row in cursor: schema_build_start_time = time.time() - if common.row_to_schema(schema, row): - singer.write_message(singer.SchemaMessage( - stream=common.calculate_destination_stream_name(stream), - schema=schema, - key_properties=['_id'])) - common.SCHEMA_COUNT[tap_stream_id] += 1 + # if common.row_to_schema(schema, row): + # singer.write_message(singer.SchemaMessage( + # stream=common.calculate_destination_stream_name(stream), + # schema=schema, + # key_properties=['_id'])) + # common.SCHEMA_COUNT[tap_stream_id] += 1 common.SCHEMA_TIMES[tap_stream_id] += time.time() - schema_build_start_time From 0cef7aabe0d375cc54eae33169274681cbcaf7ff Mon Sep 17 00:00:00 2001 From: niek mereu Date: Mon, 6 Dec 2021 11:05:38 +0100 Subject: [PATCH 14/14] bug fix in __init__.py --- tap_mongodb/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tap_mongodb/__init__.py b/tap_mongodb/__init__.py index 6f0ae88..d2bcfe9 100644 --- a/tap_mongodb/__init__.py +++ b/tap_mongodb/__init__.py @@ -323,7 +323,7 @@ def sync_stream(client, stream, state, fields_to_drop): collection_oplog_ts = oplog.get_latest_ts(client) oplog.update_bookmarks(state, tap_stream_id, collection_oplog_ts) - full_table.sync_collection(client, stream, state, stream_projection) + full_table.sync_collection(client, stream, state, stream_projection, fields_to_drop) oplog.sync_collection(client, stream, state, stream_projection) @@ -331,7 +331,7 @@ def sync_stream(client, stream, state, fields_to_drop): full_table.sync_collection(client, stream, state, stream_projection, fields_to_drop) elif replication_method == 'INCREMENTAL': - incremental.sync_collection(client, stream, state, stream_projection) + incremental.sync_collection(client, stream, state, stream_projection, fields_to_drop) else: raise Exception( "only FULL_TABLE, LOG_BASED, and INCREMENTAL replication \