# 2015. May 4th # Author: Jin, Yilong jin28@vt.edu # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # You should have received a copy of the GNU General Public License # along with this program. If not, see . #!/usr/bin/python import sys import time from util import * import glob from multiprocessing import Process, cpu_count, Manager def usage(): print 'Usage: main.py [tweet_avro_full_path] [user_avro_full_path] [schema_full_path] [output_avro_full_path]' def main(tweet_avro, user_avro, output_schema, output_avro): #process list used to store tweet importance computation processes process_list = [] # list used to record time used for each step time_list = [] # by default, the script spawns as many processes as the number of compute cores num_core = cpu_count() print 'compute using %d processes' % num_core # step one, reading tweet avro file and user avro files # while reading the input, # 1. user mention information will be constructed # 2. edge information will be updated print 'start reading input...' start = time.time() tmp = read_avro(tweet_avro, user_avro, verbose = False) end = time.time() if tmp < 0: print 'Fatal error: reading %r encouter problem, use full path' % i usage () exit(1) print '\t%f second(s) elapsed' % (end - start) time_list.append(end - start) # step two, calculate User Importance Values # Edge weights are calculated in previous step print 'calculating UIV...' start = time.time() calculate_UIV() end = time.time() print '\t%f second(s) elapsed\n' % (end - start) time_list.append(end - start) # step three, calculate tweet importance value # since this step only involves reading shared data structure and write to # different memory locations, it is executed with multiple processes TWEETS_len = len(TWEETS) print 'start processing: %d tweet importance values...' % TWEETS_len chunk = TWEETS_len / num_core mod = TWEETS_len % num_core start_index = 0 end_index = chunk start = time.time() # this for-loop creates and starts processes to calculate importance and write # importance score to a shared dictionary called TIV for i in xrange(0, num_core): end_index = start_index + chunk + 1 if i < mod else start_index + chunk p = Process(target= calculate_tweet_importance, args=(True, start_index, end_index, i, TIV)) process_list.append(p) p.start() start_index = end_index # wait until all processes finish for p in process_list: p.join() end = time.time() print '\t%f second(s) elapsed\n' % (end - start) time_list.append(end - start) # step four, output TIV in AVRO format. start = time.time() retval = output_tweet_importance(output_avro, output_schema) end = time.time() print '\t%f second(s) elapsed\n' % (end - start) print '%d tweets importaance exported' % retval time_list.append(end - start) print 'total time used: %f' % sum(time_list) return 0 if __name__ == "__main__": if len(sys.argv) != 4: usage() exit(1) # tweet_avro and user_avro are 2 input files # output_avro specifies the tweet_importance output path tweet_avro = sys.argv[1] user_avro = sys.argv[2] output_schema = sys.argv[3] output_avro = sys.argv[4] main(tweet_avro, user_avro, output_schema, output_avro) print 'script done'