-
-
Save puentesarrin/858dbb8e4b864f81d56012ac94cd0a68 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
import os | |
import sys | |
import raven | |
CURRENT = os.path.dirname(__file__) | |
sys.path.insert(0, os.path.realpath(os.path.join(CURRENT, '..', '..'))) | |
sys.path.insert(1, os.path.realpath(os.path.join(CURRENT, '..', '..', 'rebelmouse'))) | |
os.environ['DJANGO_SETTINGS_MODULE'] = 'rebelmouse.settings' | |
if os.getenv('SENTRY_DSN'): | |
raven.Client(transport=raven.transport.http.HTTPTransport) | |
from rm.boot import boot | |
boot() | |
# Code follow below | |
from warnings import warn | |
from random import choice | |
from faker import Faker | |
from rm.rmmongo import RmMongo | |
COLLECTION = 'test_collection' | |
def main(): | |
mongo = RmMongo.get_instance('mongo-test', read_from_master=False) | |
drop_collection(mongo) | |
doc_id_list = [] | |
while True: | |
mongo = RmMongo.get_instance('mongo-test', read_from_master=False) | |
doc_id_list.append(insert_document(mongo)) | |
doc_id = choice(doc_id_list) | |
if not doc_id: | |
continue | |
print 'Finding {} ...'.format(str(doc_id)) | |
mongo = RmMongo.get_instance('mongo-test', read_from_master=False) | |
doc = find_document(mongo, doc_id) | |
print 'Found {}'.format(doc) | |
def drop_collection(mongo): | |
mongo[COLLECTION].drop() | |
def insert_document(mongo): | |
faker = Faker() | |
document = {'name': faker.name(), 'phone': faker.phone_number()} | |
return call_or_return_none(lambda: mongo[COLLECTION].insert_one(document).inserted_id) | |
def find_document(mongo, doc_id): | |
return call_or_return_none(lambda: mongo[COLLECTION].find_one({'_id': doc_id})) | |
def call_or_return_none(func): | |
try: | |
return func() | |
except Exception as e: | |
warn(str(e), RuntimeWarning) | |
return None | |
if __name__ == '__main__': | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
import os | |
import sys | |
import raven | |
CURRENT = os.path.dirname(__file__) | |
sys.path.insert(0, os.path.realpath(os.path.join(CURRENT, '..', '..'))) | |
sys.path.insert(1, os.path.realpath(os.path.join(CURRENT, '..', '..', 'rebelmouse'))) | |
os.environ['DJANGO_SETTINGS_MODULE'] = 'rebelmouse.settings' | |
if os.getenv('SENTRY_DSN'): | |
raven.Client(transport=raven.transport.http.HTTPTransport) | |
from rm.boot import boot | |
boot() | |
# Code follow below | |
from warnings import warn | |
from random import choice | |
import django.conf as conf | |
from faker import Faker | |
import mongoengine | |
COLLECTION = 'test_collection' | |
def main(): | |
register_connection() | |
mongo = mongoengine.connection.get_db('mongo-test') | |
drop_collection(mongo) | |
doc_id_list = [] | |
while True: | |
mongo = mongoengine.connection.get_db('mongo-test') | |
doc_id_list.append(insert_document(mongo)) | |
doc_id = choice(doc_id_list) | |
if not doc_id: | |
continue | |
print 'Finding {} ...'.format(str(doc_id)) | |
mongo = mongoengine.connection.get_db('mongo-test') | |
doc = find_document(mongo, doc_id) | |
print 'Found {}'.format(doc) | |
def register_connection(): | |
mongo_settings = conf.settings.MONGO['mongo-test'] | |
mongo_host = mongo_settings['host'] | |
mongo_db_name = mongo_settings['db'] | |
host = 'mongodb://{}/{}'.format(mongo_host, mongo_db_name) | |
mongoengine.register_connection(alias='mongo-test', host=host) | |
def drop_collection(mongo): | |
mongo[COLLECTION].drop() | |
def insert_document(mongo): | |
faker = Faker() | |
document = {'name': faker.name(), 'phone': faker.phone_number()} | |
return call_or_return_none(lambda: mongo[COLLECTION].insert_one(document).inserted_id) | |
def find_document(mongo, doc_id): | |
return call_or_return_none(lambda: mongo[COLLECTION].find_one({'_id': doc_id})) | |
def call_or_return_none(func): | |
try: | |
return func() | |
except Exception as e: | |
warn(str(e), RuntimeWarning) | |
return None | |
if __name__ == '__main__': | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
import os | |
import sys | |
import raven | |
CURRENT = os.path.dirname(__file__) | |
sys.path.insert(0, os.path.realpath(os.path.join(CURRENT, '..', '..'))) | |
sys.path.insert(1, os.path.realpath(os.path.join(CURRENT, '..', '..', 'rebelmouse'))) | |
os.environ['DJANGO_SETTINGS_MODULE'] = 'rebelmouse.settings' | |
if os.getenv('SENTRY_DSN'): | |
raven.Client(transport=raven.transport.http.HTTPTransport) | |
from rm.boot import boot | |
boot() | |
# Code follow below | |
from warnings import warn | |
from random import choice | |
import django.conf as conf | |
from faker import Faker | |
import mongoengine | |
import pymongo | |
COLLECTION = 'test_collection' | |
def main(): | |
register_connection() | |
mongo = mongoengine.connection.get_db('mongo-test') | |
drop_collection(mongo) | |
doc_id_list = [] | |
while True: | |
mongo = mongoengine.connection.get_db('mongo-test') | |
doc_id_list.append(insert_document(mongo)) | |
doc_id = choice(doc_id_list) | |
if not doc_id: | |
continue | |
print 'Finding {} ...'.format(str(doc_id)) | |
mongo = mongoengine.connection.get_db('mongo-test') | |
doc = find_document(mongo, doc_id) | |
print 'Found {}'.format(doc) | |
def register_connection(): | |
mongo_settings = conf.settings.MONGO['mongo-test'] | |
mongo_host = mongo_settings['replica_host'] | |
mongo_db_name = mongo_settings['db'] | |
host = 'mongodb://{}/{}'.format(mongo_host, mongo_db_name) | |
mongoengine.register_connection(alias='mongo-test', host=host, replicaSet=mongo_settings['replica_name'], | |
read_preference=pymongo.ReadPreference.SECONDARY_PREFERRED) | |
def drop_collection(mongo): | |
mongo[COLLECTION].drop() | |
def insert_document(mongo): | |
faker = Faker() | |
document = {'name': faker.name(), 'phone': faker.phone_number()} | |
return call_or_return_none(lambda: mongo[COLLECTION].insert_one(document).inserted_id) | |
def find_document(mongo, doc_id): | |
return call_or_return_none(lambda: mongo[COLLECTION].find_one({'_id': doc_id})) | |
def call_or_return_none(func): | |
try: | |
return func() | |
except Exception as e: | |
warn(str(e), RuntimeWarning) | |
return None | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment