# 2015. May 4th # Author: Jin, Yilong jin28@vt.edu # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # You should have received a copy of the GNU General Public License # along with this program. If not, see . #!/usr/bin/python import sys import avro import time import glob from avro.datafile import DataFileReader, DataFileWriter from multiprocessing import Manager, Process, cpu_count from avro.io import DatumReader, DatumWriter import json, codecs, operator tweet_avro_writer = None user_avro_writer = None tweet_avro_schema = None user_avro_schema = None tweet_avro_output_name = None user_avro_output_name = None CORE = cpu_count() chunk = 0 def usage(): print '========== usage ========== ' print '\tmulti_convert.py [tweet_schema_full_path] [user_schema_full_path] [tweet_output_name] [user_output_name] [JSON_file_full_path]' print 'For example:' print '\tmulti_convert.py `pwd`/tweet_schema `pwd`/user_schema tweet_avro user_avro `pwd`/my_json_files/' def main(argv): global chunk global tweet_avro_output_name global user_avro_output_name tweet_schema = argv[1] user_schema = argv[2] tweet_avro_schema = avro.schema.parse(open(tweet_schema).read()) user_avro_schema = avro.schema.parse(open(user_sch).read()) tweet_avro_output_name = argv[3] user_avro_output_name = argv[4] tweet_path = argv[5] #'/Users/Jin/Documents/cs5604project/police_results/' process_list = [] file_list = glob.glob(tweet_path + '*.json') print 'converting %d JSON files to AVRO' % len(file_list) chunk = len(file_list) / CORE mod = len(file_list) % CORE start = 0 end = chunk start = time.time() for i in xrange(0, CORE): if i < mod: end = start + chunk + 1 else: end = start + chunk p = Process(target=convert_to_avro, args=(i, file_list, start, end )) process_list.append(p) p.start() start = end for p in process_list: p.join() end = time.time() print 'total time elapsed: %f' % (end - start) def convert_to_avro(thread_num, file_list, local_start, local_end): global tweet_avro_schema global user_avro_schema global tweet_avro_writer global user_avro_writer global chunk print 'convert_to_avro called %d' % thread_num return 0; my_tweet_avro_output_name = tweet_avro_output_name % thread_num my_user_avro_output_name = user_avro_output_name % thread_num tweet_avro_writer = DataFileWriter(open(my_tweet_avro_output_name, "w"), DatumWriter(), tweet_avro_schema) user_avro_writer = DataFileWriter(open(my_user_avro_output_name, "w"), DatumWriter(), user_avro_schema) local_json_counter = 0 local_counter = 0 tmp_num = local_end - local_start for fs in file_list[local_start: local_end]: #if local_counter % 100 == 0: sys.stdout.write('\rthread_%d: processing %d/%d' % (thread_num, local_counter, tmp_num)) sys.stdout.flush() file_full_path = fs try: local_counter += 1 tmp_file = codecs.open(file_full_path , "r", "utf-8", errors='ignore') for line in tmp_file: try: line = line.strip() except: #print "something wrong parsing line %d in %s" % (lineNumber,filename) continue if line != "": try: s = json.loads(line) actor_string = s.get('actor') if actor_string is None: #print "empty actor string in line %d in %s" % (lineNumber, filename) continue except: continue tweet_id = '' text_original = '' text_clean = '' created_at = '' source = '' user_screen_name = '' user_id = '' lang = '' urls = '' retweet_count = 0 favorite_count = 0 contributors_id = '' coordinates = '' hashtags = '' user_mentions_id = '' in_reply_to_user_id = '' in_reply_to_status_id = '' list_count = 0 follower_count = 0 list_count = actor_string.get('listedCount') follower_count = actor_string.get('followersCount') tweet_id_str = str(s.get('id')).split(':')[2] tweet_id = tweet_id_str text_original = s.get('body') created_at = str(s.get('postedTime')) generator_str = '' generator_str = s.get('generator') if generator_str is not None: source = generator_str.get('displayName') user_screen_name = str(actor_string.get('preferredUsername')) user_id = str(actor_string['id']).split(':')[2] lang_str = actor_string.get('languages') if lang_str is not None: lang = lang_str[0] retweet_count = int(float(s.get('retweetCount'))) favorite_count = int(float(s.get('favoritesCount'))) # this field does not exist in the dataset contributors_id = user_screen_name # get geo information here # some tweets do not have geo field coordinates = "" geo_str = '' geo_str = s.get('geo') if geo_str is not None: coordinates = "%f,%f" % ( geo_str.get('coordinates')[0], geo_str.get('coordinates')[1]) # get hashtag here # this should be a list of strings hashtags = '' twitter_entities_str = '' twitter_entities_str = s.get('twitter_entities') if twitter_entities_str is not None: urls_list = twitter_entities_str.get('urls') if len(urls_list) > 0: for u in urls_list: urls += u.get('expanded_url') + ' ' hashtag_list = twitter_entities_str.get('hashtags') for h in hashtag_list: hashtags += '#' + h.get('text') + ' ' user_mentions_id_list = twitter_entities_str.get('user_mentions') if len(user_mentions_id_list) > 0: for i in user_mentions_id_list: user_mentions_id += '@' + i.get('screen_name') + ' ' in_reply_to_str = s.get('inReplyTo') if in_reply_to_str is not None: tmp_list = in_reply_to_str.get('link').split('/') in_reply_to_user_id = tmp_list[3] in_reply_to_status_id = tmp_list[5] doc_id = 'MA_PD_' + tweet_id #end of parsing tweet_avro_writer.append({ "doc_id": doc_id, "tweet_id": tweet_id, "text_clean": text_clean, "text_original": text_original, "created_at": created_at, "user_screen_name": user_screen_name, "user_id": user_id, "source": source, "favorite_count": favorite_count, "retweet_count": retweet_count, "lang": lang, "contributors_id": contributors_id, "coordinates": coordinates, "urls": urls, "hashtags": hashtags, "user_mentions_id": user_mentions_id, "in_reply_to_user_id": in_reply_to_user_id, "in_reply_to_status_id": in_reply_to_status_id, "text_clean2": "null", "collection": "MA_PD" }) #write AVRO here # write user information to user output AVRO user_avro_writer.append({ 'user_id': user_id, 'user_screen_name': user_screen_name, 'follower_count': int(follower_count), 'list_count': int(list_count) }) local_json_counter += 1 #current file end. New JSON file in next iteration tmp_file.close() except: print 'exception happened in reading file %s, try specify full path' % fs tmp_file.close() continue tweet_avro_writer.close() user_avro_writer.close() print 'thread_%d processed %d JSON object from %d JSON files' % (thread_num, local_json_counter, local_counter) if __name__ == '__main__': if(len(sys.argv) != 6): usage() exit(1) main(sys.argv)