Как обрабатывать повторяющиеся ключевые результаты на этапе Mapper для реализации HDFS PageRank?Python

Программы на Python
Ответить
Anonymous
 Как обрабатывать повторяющиеся ключевые результаты на этапе Mapper для реализации HDFS PageRank?

Сообщение Anonymous »

Я писал код PageRank для работы в HDFS, поэтому написал Mapper и Редюсер. У меня есть данные в следующем формате: страница «исходящие_ссылки», например:

Код: Выделить всё

Page_1  Page_18,Page_109,Page_696,Page_579,Page_14,Page_532,Page_705
Page_2  Page_671,Page_231,Page_631,Page_45,Page_79,Page_163,Page_419,Page_653
Page_3  Page_807
Page_4  Page_358,Page_716
....
Page_1000    Page_295,Page_186,Page_585,Page_958,Page_765,Page_495,Page_639
Я создал картограф, чтобы разделить страницы на следующий формат: страница:[исходящие страницы]. Я пытаюсь присвоить странице рейтинг на основе входящих на нее ссылок. Например, если страница_1 имеет две исходящие ссылки, и одна из них ведет на страницу_18, то страница_18 получит половину рейтинга от страницы_1, поскольку у нее есть одна из двух исходящих ссылок.
Чтобы упростим задачу, если у нас есть этот набор данных:

Код: Выделить всё

A    B,C
B    C
C    A,B,D
D    C
Начнем с A. Если у A есть исходящие ссылки на B и C, я делю рейтинг A поровну между B и C. Таким образом, B получает половину ранга A, а C — другую половину. . Я продолжаю делать это для всех страниц.
Например, я вижу, что у A есть исходящие ссылки на B и C, а это значит, что у B и C есть входящие ссылки от A. Поскольку у A есть две исходящие ссылки ссылки, ее рейтинг делится на 2 для каждой B и C.
Я реализовал эту логику в редукторе, чтобы суммировать все вклады для каждой страницы и завершить формулу.
Формула PageRank
Сортировка не сработала. поначалу правильно, потому что формат имен страниц был похож на page_1, а терминал HDFS сортирует ключи по алфавиту. Чтобы это исправить, я добавил нулевое дополнение к номерам страниц, и все работало нормально.
Однако я столкнулся с другой проблемой: на некоторых страницах не было входящих ссылок. Сопоставитель напечатал только начальное значение для этих страниц. Чтобы справиться с этим, я создал в картографе два набора — один для напечатанных страниц, а другой для проверки того, не была ли страница еще напечатана. Если страница не печаталась, я обязательно распечатывал ее.
Когда я тестировал код локально, все работало хорошо. Но в HDFS я получил неправильный вывод. Это произошло потому, что HDFS работает распределенным образом, и два узла (зависит от того) проверяли одну и ту же страницу, в результате чего она печаталась дважды. Вы можете видеть это на картинке — это выходные данные преобразователя HDFS.
Выходные данные преобразователя HDFS
Я обработал эту ошибку, создав счетчик в Редюсере. Если счетчик не равен текущей странице, я печатаю страницу со значением 0,001. Мой вопрос: есть ли способ исправить эту ошибку в Mapper, а не в Редюсере?
Если вы запрашиваете коды картографа и редуктора, вот они:
Картограф:

Код: Выделить всё

#! /usr/bin/python3

import sys
import re

initial_rank = 1 / 1000 # 1 / number of pages

def mapper():

for line in sys.stdin:
line = line.strip()
page, links = line.split() # Page: [list]
links = links.split(',') # list[object,object]

for link in links:
match = re.search(r'\d{1,4}', link)
link = match.group(0).zfill(4)

print(f'page{link}\t{initial_rank / len(links)}') # PR(Pi) / C(Pj)

if __name__ == "__main__":
mapper()
Редуктор

Код: Выделить всё

#! /usr/bin/python3

import sys
import re

d = 0.85
N = 1000

total_contribution = 0
current_page = None

'''There are two parts for the formula'''

# part1
part1 = (1 - d) / N

counter = 1
#part2
for line in sys.stdin: # page   rank_contribution
line = line.strip()
page, contribution = line.split()
contribution = float(contribution)

# sum the contributions for that page
if current_page == page:
total_contribution += contribution
else:
if current_page:
# final rank for the page
final_rank = part1 + d * total_contribution
print(f'{counter}\t{current_page}\t{final_rank}')
counter += 1
# the next new page
current_page = page
total_contribution = contribution

if counter != int(re.search(r'\d{1,4}', current_page).group()):
print(f'{counter}\t{current_page}\t{0.001}')
counter += 1

# print the final page
if current_page:
final_rank = part1 + d * total_contribution
print(f'{counter}\t{current_page}\t{final_rank}')
Я реализовал Mapper для разделения страниц и расчета вклада в рейтинг, а также редуктор для их суммирования. Локально все работало хорошо, но на HDFS я столкнулся с проблемами дублирования записей из-за распределенной обработки. Я попытался добавить счетчик в Редюсер, чтобы справиться с этой проблемой, и это сработало, но я ищу решение, позволяющее избежать этой проблемы в самом Mapper.

Подробнее здесь: https://stackoverflow.com/questions/792 ... ank-implem
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»