使用ijson高效解析大体积JSON数据的Python流处理方案
import ijson
with open('twitter_archive.json', 'rb') as f:
for tweet in ijson.items(f, 'item'):
if tweet['lang'] == 'en' and tweet['geo']:
processed = {
'id': tweet['id_str'],
'text': tweet['text'],
'coordinates': tweet['coordinates']
}
kafka_producer.send('geo_tweets', processed)
def json_stream_processor(file_path):
with open(file_path, 'rb') as f:
parser = ijson.parse(f)
current_order = {}
for prefix, event, value in parser:
if prefix.startswith('orders.item'):
if event == 'map_key':
current_order[value] = None
elif event in ('string', 'number'):
current_order[prefix.split('.')[-1]] = value
elif event == 'end_map':
yield current_order
current_order = {}
if counter % 1000 == 0:
flush_to_database()