Код: Выделить всё
import multiprocessing
import sys
import pymongo
import datetime
from multiprocessing import Pool
def query_records_by_date(ticker, start_of_day):
"""
This function looks up records from MongoDB using parameters provided and return a list
"""
mongo = pymongo.MongoClient("mongodb://localhost:27017/")
...
def get_all_post_dates(ticker):
"""
This function uses aggregations to get something from MongoDB and return a list of datetime.datetime objects
"""
mongo = pymongo.MongoClient("mongodb://localhost:27017/")
...
def process_ticker_attention(ticker, finish_num, lock):
mongo_p = pymongo.MongoClient("mongodb://localhost:27017/", connectTimeoutMS=600000)
try:
ticker_dates = get_all_post_dates(ticker)
for date in ticker_dates:
data = query_records_by_date(ticker, date)
mongo_p['stock']['attention'].update_one({'date': date}, {
'$set': {
'post_numbers': {
ticker: len(data)
}
}
})
lock.acquire()
finish_num.value += 1
sys.stdout.write(f'\rfinish_count: {finish_num.value}')
sys.stdout.flush()
lock.release()
except Exception as e:
print('error!', e.__class__.__name__, e)
if __name__ == '__main__':
mongo = pymongo.MongoClient("mongodb://localhost:27017/")
database_name = 'StockForum'
lock = multiprocessing.Lock()
finish_num = multiprocessing.Manager().Value('i', 0)
tickers = mongo[database_name].list_collection_names()
p = Pool(6)
for t in tickers:
p.apply_async(process_ticker_attention, args=(t, finish_num, lock))
p.close()
p.join()
Поначалу код работает хорошо, но примерно через 20-30 секунд выдает ошибку:< /p>
Код: Выделить всё
error! AutoReconnect localhost:27017: [WinError 10048] Only one usage of each socket address (protocol/network address/port) is normally permitted (configured timeouts: connectTimeoutMS: 20000.0ms)
Подробнее здесь: https://stackoverflow.com/questions/792 ... processing
Мобильная версия