diff --git a/README.md b/README.md index 088b748..d89380b 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,7 @@ # tap-mongodb -This is a [Singer](https://singer.io) tap that produces JSON-formatted data following the [Singer spec](https://github.com/singer-io/getting-started/blob/master/SPEC.md) from a MongoDB source. +This is a [Singer](https://singer.io) tap that produces JSON-formatted data following the [Singer spec](https://github.com/singer-io/getting-started/blob/master/SPEC.md) from a MongoDB source. +This plugin extends the original [implementation](https://github.com/singer-io/tap-mongodb) by introducing support for **srv** mode. ## Set up Virtual Environment ``` @@ -20,17 +21,17 @@ Create json file called `config.json`, with the following contents: { "password": "", "user": "", - "host": "", - "port": "", - "database": "" + "host": "" } ``` The folowing parameters are optional for your config file: | Name | Type | Description | | -----|------|------------ | -| `replica_set` | string | name of replica set | +| `port` | integer | port number (default is 27017) | +| `database` | string | name of the database | |`ssl` | Boolean | can be set to true to connect using ssl | +|`srv` | Boolean | use DNS Seed List connection string - [link](https://docs.mongodb.com/manual/reference/connection-string/) | | `include_schema_in_destination_stream_name` | Boolean | forces the stream names to take the form `_` instead of ``| All of the above attributes are required by the tap to connect to your mongo instance. @@ -133,6 +134,91 @@ The tap will write bookmarks to stdout which can be captured and passed as an op ### Local MongoDB Setup If you haven't yet set up a local mongodb client, follow [these instructions](https://github.com/singer-io/tap-mongodb/blob/master/spikes/local_mongo_setup.md) +## Use as a Meltano plugin +To use this tap in Meltano, the configuration `meltano.yml` file needs to be extended with a definition of a **custom plugin** as follows: + +``` +plugins: + extractors: + - name: tap-mongodb + label: MongoDB + description: General purpose, document-based, distributed database + namespace: tap_mongodb + variants: + - name: strv + repo: https://github.com/strvcom/tap-mongodb + pip_url: git+https://github.com/strvcom/tap-mongodb.git + executable: tap-mongodb + capabilities: + - catalog + - discover + - state + settings_group_validation: + - [ 'host', 'user', 'password'] + settings: + - name: host + label: Host URL + value: localhost + - name: port + kind: integer + value: 27017 + - name: user + - name: password + kind: password + - name: database + label: Database Name + - name: replica_set + - name: ssl + kind: boolean + value: false + value_post_processor: stringify + label: SSL + - name: verify_mode + kind: boolean + value: true + value_post_processor: stringify + description: SSL Verify Mode + - name: srv + kind: boolean + value: true + description: Use DNS Seed List connection string + - name: include_schemas_in_destination_stream_name + kind: boolean + value: false + description: Forces the stream names to take the form `_` instead of `` +``` + +To configure the plugin, there is an [official guide](https://hub.meltano.com/extractors/mongodb.html) to follow. +The official guide & plugin does not support **srv** mode, to configure this plugin to run in this mode, extend the configuration file with the following: + +``` +plugins: + extractors: + - name: tap-mongodb + ... + config: + host: + user: + srv: true +``` + +The passwords should be set as described in the original guide. + +**Note**: An extractor configuration needs to be defined for this plugin to work. The configuration is described in the [official guide](https://meltano.com/docs/getting-started.html#add-an-extractor-to-pull-data-from-a-source). +A simple example of additional configuration is the following extension of configuration file: + +``` +plugins: + extractors: + - name: tap-mongodb + ... + config: + ... + metadata: + '*': + replication-method: FULL_TABLE +``` + --- Copyright © 2019 Stitch diff --git a/setup.py b/setup.py index 1e31ae0..3b84b5e 100644 --- a/setup.py +++ b/setup.py @@ -2,30 +2,31 @@ from setuptools import setup -setup(name='tap-mongodb', - version='2.0.1', - description='Singer.io tap for extracting data from MongoDB', - author='Stitch', - url='https://singer.io', - classifiers=['Programming Language :: Python :: 3 :: Only'], - py_modules=['tap_mongodb'], - install_requires=[ - 'singer-python==5.8.0', - 'pymongo==3.8.0', - 'tzlocal==2.0.0', - 'terminaltables==3.1.0', - ], - extras_require={ - 'dev': [ - 'pylint', - 'nose', - 'ipdb' - ] - }, - entry_points=''' +setup( + name='tap-mongodb', + version='2.0.1', + description='Singer.io tap for extracting data from MongoDB', + author='Stitch', + url='https://singer.io', + classifiers=['Programming Language :: Python :: 3 :: Only'], + py_modules=['tap_mongodb'], + install_requires=[ + 'singer-python==5.8.0', + 'pymongo==3.12.2', + 'tzlocal==2.0.0', + 'terminaltables==3.1.0', + 'dnspython==2.1.0' # to support srv + ], + extras_require={ + 'dev': [ + 'pylint', + 'nose', + 'ipdb' + ] + }, + entry_points=''' [console_scripts] tap-mongodb=tap_mongodb:main ''', - packages=['tap_mongodb', 'tap_mongodb.sync_strategies'], - + packages=['tap_mongodb', 'tap_mongodb.sync_strategies'], ) diff --git a/tap_mongodb/__init__.py b/tap_mongodb/__init__.py index 035dcb6..d2bcfe9 100644 --- a/tap_mongodb/__init__.py +++ b/tap_mongodb/__init__.py @@ -3,27 +3,22 @@ import json import ssl import sys -import time -import pymongo -from bson import timestamp +import pymongo import singer from singer import metadata, metrics, utils import tap_mongodb.sync_strategies.common as common import tap_mongodb.sync_strategies.full_table as full_table -import tap_mongodb.sync_strategies.oplog as oplog import tap_mongodb.sync_strategies.incremental as incremental - +import tap_mongodb.sync_strategies.oplog as oplog LOGGER = singer.get_logger() REQUIRED_CONFIG_KEYS = [ 'host', - 'port', 'user', - 'password', - 'database' + 'password' ] IGNORE_DBS = ['system', 'local', 'config'] @@ -103,16 +98,22 @@ def get_roles(client, config): roles.append(sub_role) return roles -def get_databases(client, config): - roles = get_roles(client, config) - LOGGER.info('Roles: %s', roles) - - can_read_all = len([r for r in roles if r['role'] in ROLES_WITH_ALL_DB_FIND_PRIVILEGES]) > 0 - if can_read_all: +def get_databases(client, config): + if 'database' not in config: + # handle the case when no database is provided - read all databases + LOGGER.info('No Roles loaded') db_names = [d for d in client.list_database_names() if d not in IGNORE_DBS] else: - db_names = [r['db'] for r in roles if r['db'] not in IGNORE_DBS] + # take roles into consideration + roles = get_roles(client, config) + LOGGER.info('Roles: %s', roles) + can_read_all = any([r['role'] in ROLES_WITH_ALL_DB_FIND_PRIVILEGES for r in roles]) + if can_read_all: + db_names = [d for d in client.list_database_names() if d not in IGNORE_DBS] + else: + db_names = [r['db'] for r in roles if r['db'] not in IGNORE_DBS] + db_names = list(set(db_names)) # Make sure each db is only in the list once LOGGER.info('Datbases: %s', db_names) return db_names @@ -179,7 +180,7 @@ def do_discover(client, config): db_name, collection_name) streams.append(produce_collection_schema(collection)) - json.dump({'streams' : streams}, sys.stdout, indent=2) + json.dump({'streams': streams}, sys.stdout, indent=2) def is_stream_selected(stream): @@ -191,7 +192,6 @@ def is_stream_selected(stream): def get_streams_to_sync(streams, state): - # get selected streams selected_streams = [s for s in streams if is_stream_selected(s)] # prioritize streams that have not been processed @@ -214,7 +214,7 @@ def get_streams_to_sync(streams, state): lambda s: s['tap_stream_id'] == currently_syncing, ordered_streams)) non_currently_syncing_streams = list(filter(lambda s: s['tap_stream_id'] - != currently_syncing, + != currently_syncing, ordered_streams)) streams_to_sync = currently_syncing_stream + non_currently_syncing_streams @@ -230,6 +230,7 @@ def write_schema_message(stream): schema=stream['schema'], key_properties=['_id'])) + def load_stream_projection(stream): md_map = metadata.to_map(stream['metadata']) stream_projection = metadata.get(md_map, (), 'tap-mongodb.projection') @@ -246,10 +247,11 @@ def load_stream_projection(stream): if stream_projection and stream_projection.get('_id') == 0: raise common.InvalidProjectionException( "Projection blacklists key property id for collection {}" \ - .format(stream['tap_stream_id'])) + .format(stream['tap_stream_id'])) return stream_projection + def clear_state_on_replication_change(stream, state): md_map = metadata.to_map(stream['metadata']) tap_stream_id = stream['tap_stream_id'] @@ -276,7 +278,8 @@ def clear_state_on_replication_change(stream, state): return state -def sync_stream(client, stream, state): + +def sync_stream(client, stream, state, fields_to_drop): tap_stream_id = stream['tap_stream_id'] common.COUNTS[tap_stream_id] = 0 @@ -284,7 +287,6 @@ def sync_stream(client, stream, state): common.SCHEMA_COUNT[tap_stream_id] = 0 common.SCHEMA_TIMES[tap_stream_id] = 0 - md_map = metadata.to_map(stream['metadata']) replication_method = metadata.get(md_map, (), 'replication-method') database_name = metadata.get(md_map, (), 'database-name') @@ -321,15 +323,15 @@ def sync_stream(client, stream, state): collection_oplog_ts = oplog.get_latest_ts(client) oplog.update_bookmarks(state, tap_stream_id, collection_oplog_ts) - full_table.sync_collection(client, stream, state, stream_projection) + full_table.sync_collection(client, stream, state, stream_projection, fields_to_drop) oplog.sync_collection(client, stream, state, stream_projection) elif replication_method == 'FULL_TABLE': - full_table.sync_collection(client, stream, state, stream_projection) + full_table.sync_collection(client, stream, state, stream_projection, fields_to_drop) elif replication_method == 'INCREMENTAL': - incremental.sync_collection(client, stream, state, stream_projection) + incremental.sync_collection(client, stream, state, stream_projection, fields_to_drop) else: raise Exception( "only FULL_TABLE, LOG_BASED, and INCREMENTAL replication \ @@ -340,12 +342,12 @@ def sync_stream(client, stream, state): singer.write_message(singer.StateMessage(value=copy.deepcopy(state))) -def do_sync(client, catalog, state): +def do_sync(client, catalog, state, fields_to_drop): all_streams = catalog['streams'] streams_to_sync = get_streams_to_sync(all_streams, state) for stream in streams_to_sync: - sync_stream(client, stream, state) + sync_stream(client, stream, state, fields_to_drop) LOGGER.info(common.get_sync_summary(catalog)) @@ -358,20 +360,35 @@ def main_impl(): verify_mode = config.get('verify_mode', 'true') == 'true' use_ssl = config.get('ssl') == 'true' - connection_params = {"host": config['host'], - "port": int(config['port']), - "username": config.get('user', None), - "password": config.get('password', None), - "authSource": config['database'], - "ssl": use_ssl, - "replicaset": config.get('replica_set', None), - "readPreference": 'secondaryPreferred'} + # Use DNS Seed List + srv = config.get('srv') == 'true' + + # if no dropping fields specified, create empty list + if not 'fields_to_drop' in list(config.keys()): + config['fields_to_drop'] = [] + + # create the connection + if srv: + connection_params = { + "host": config['host'], + "port": int(config.get('port', 27017)), + "username": config.get('user', None), + "password": config.get('password', None), + "authSource": config.get('database', None), + "ssl": use_ssl, + "replicaset": config.get('replica_set', None), + "readPreference": 'secondaryPreferred' + } - # NB: "ssl_cert_reqs" must ONLY be supplied if `SSL` is true. - if not verify_mode and use_ssl: - connection_params["ssl_cert_reqs"] = ssl.CERT_NONE + # NB: "ssl_cert_reqs" must ONLY be supplied if `SSL` is true. + if not verify_mode and use_ssl: + connection_params["ssl_cert_reqs"] = ssl.CERT_NONE - client = pymongo.MongoClient(**connection_params) + client = pymongo.MongoClient(**connection_params) + else: + # TODO - does not take all parameters into account -> just our case + connection_url = "mongodb+srv://{}:{}@{}".format(config['user'], config['password'], config['host']) + client = pymongo.MongoClient(connection_url) LOGGER.info('Connected to MongoDB host: %s, version: %s', config['host'], @@ -384,7 +401,8 @@ def main_impl(): do_discover(client, config) elif args.catalog: state = args.state or {} - do_sync(client, args.catalog.to_dict(), state) + do_sync(client, args.catalog.to_dict(), state, config['fields_to_drop']) + def main(): try: diff --git a/tap_mongodb/sync_strategies/common.py b/tap_mongodb/sync_strategies/common.py index 1eaa18c..dca0afa 100644 --- a/tap_mongodb/sync_strategies/common.py +++ b/tap_mongodb/sync_strategies/common.py @@ -289,12 +289,14 @@ def row_to_schema(schema, row): # get pointer to field's anyOf list if not schema.get('properties', {}).get(field): - schema['properties'][field] = {'anyOf': [{}]} + schema['properties'][field] = {'anyOf': [{"type": "null"}]} anyof_schema = schema['properties'][field]['anyOf'] # add value's schema to anyOf list changed = add_to_any_of(anyof_schema, value) or changed + break + return changed def get_sync_summary(catalog): diff --git a/tap_mongodb/sync_strategies/full_table.py b/tap_mongodb/sync_strategies/full_table.py index 6934757..885abc3 100644 --- a/tap_mongodb/sync_strategies/full_table.py +++ b/tap_mongodb/sync_strategies/full_table.py @@ -17,8 +17,10 @@ def get_max_id_value(collection): return None + + # pylint: disable=too-many-locals,invalid-name,too-many-statements -def sync_collection(client, stream, state, projection): +def sync_collection(client, stream, state, projection, fields_to_drop): tap_stream_id = stream['tap_stream_id'] LOGGER.info('Starting full table sync for %s', tap_stream_id) @@ -27,6 +29,9 @@ def sync_collection(client, stream, state, projection): db = client[database_name] collection = db[stream['stream']] + + for field in fields_to_drop: + collection.update_many({}, {'$unset': {field: ''}}) #before writing the table version to state, check if we had one to begin with first_run = singer.get_bookmark(state, stream['tap_stream_id'], 'version') is None @@ -110,12 +115,13 @@ def sync_collection(client, stream, state, projection): rows_saved += 1 schema_build_start_time = time.time() - if common.row_to_schema(schema, row): - singer.write_message(singer.SchemaMessage( - stream=common.calculate_destination_stream_name(stream), - schema=schema, - key_properties=['_id'])) - common.SCHEMA_COUNT[stream['tap_stream_id']] += 1 + # if common.row_to_schema(schema, row): + + # singer.write_message(singer.SchemaMessage( + # stream=common.calculate_destination_stream_name(stream), + # schema=schema, + # key_properties=['_id'])) + # common.SCHEMA_COUNT[stream['tap_stream_id']] += 1 common.SCHEMA_TIMES[stream['tap_stream_id']] += time.time() - schema_build_start_time record_message = common.row_to_singer_record(stream, diff --git a/tap_mongodb/sync_strategies/incremental.py b/tap_mongodb/sync_strategies/incremental.py index 4fc533f..237e323 100644 --- a/tap_mongodb/sync_strategies/incremental.py +++ b/tap_mongodb/sync_strategies/incremental.py @@ -26,13 +26,16 @@ def update_bookmark(row, state, tap_stream_id, replication_key_name): replication_key_type) # pylint: disable=too-many-locals, too-many-statements -def sync_collection(client, stream, state, projection): +def sync_collection(client, stream, state, projection, fields_to_drop): tap_stream_id = stream['tap_stream_id'] LOGGER.info('Starting incremental sync for %s', tap_stream_id) stream_metadata = metadata.to_map(stream['metadata']).get(()) collection = client[stream_metadata['database-name']][stream['stream']] + for field in fields_to_drop: + collection.update_many({}, {'$unset': {field: ''}}) + #before writing the table version to state, check if we had one to begin with first_run = singer.get_bookmark(state, stream['tap_stream_id'], 'version') is None @@ -93,12 +96,12 @@ def sync_collection(client, stream, state, projection): for row in cursor: schema_build_start_time = time.time() - if common.row_to_schema(schema, row): - singer.write_message(singer.SchemaMessage( - stream=common.calculate_destination_stream_name(stream), - schema=schema, - key_properties=['_id'])) - common.SCHEMA_COUNT[tap_stream_id] += 1 + # if common.row_to_schema(schema, row): + # singer.write_message(singer.SchemaMessage( + # stream=common.calculate_destination_stream_name(stream), + # schema=schema, + # key_properties=['_id'])) + # common.SCHEMA_COUNT[tap_stream_id] += 1 common.SCHEMA_TIMES[tap_stream_id] += time.time() - schema_build_start_time