美文网首页
python读取MySQL插入ES

python读取MySQL插入ES

作者: HAO延WEI | 来源:发表于2019-06-01 17:08 被阅读0次

    注释:1.MySQL表中字段类型需和es字段相互对应
    2.日期转为字符串且“-”需要换为“\”

    # -*- coding: utf-8 -*-
    
    """
    Create by Mr.Hao on 2019/5/28.
    
    """
    import sys
    reload(sys)
    sys.setdefaultencoding('utf8')
    
    import pymysql
    from pymysql.cursors import DictCursor
    from elasticsearch import Elasticsearch
    from elasticsearch.helpers import bulk
    
    
    class FinanceStorage(object):
    
        def __init__(self, *args, **kwargs):
            super(FinanceStorage, self).__init__(*args, **kwargs)
    
        def connection(self):
            return pymysql.connect(host='127.0.0.1',
                                    user='root',
                                    passwd="",
                                    db='bdp_spider')
    
        def get_data(self):
            conn = self.connection()
            cursor = conn.cursor(DictCursor)
            sql = "SELECT * FROM media_consume_table;"
            cursor.execute(sql)
            result = cursor.fetchall()
            cursor.close()
            conn.close()
            return result
    
    
    class ElasticObj(object):
    
        def __init__(self, index_name,index_type, ip ="127.0.0.1"):
            '''
    
            :param index_name: 索引名称
            :param index_type: 索引类型
            '''
            self.index_name =index_name
            self.index_type = index_type
            self.es = Elasticsearch([ip],http_auth=('elastic', 'password'),port=9200)
    
        def create_index(self, index_name='data', index_type="media_consume_table"):
            _index_mappings = {
                "mappings": {
                    index_type: {  # 相当于数据库中的表名
                        "properties": {
                            "id": {
                              "type": "long",
                              "store": True,
                            },
                            "dt": {
                                "type": "date",
                                "store": True,
                                "format": "yyyy/MM/dd HH:mm:ss||yyyy/MM/dd||epoch_millis"
                            },
                            "product": {
                                "type": "keyword",
                                "index": True,
                            },
                            "meida": {
                                "type": "keyword",
                                "index": True,
                            },
                            "meida_sub_categorey": {
                                "type": "keyword",
                                "index": True,
                            },
                            "account": {
                                "type": "keyword",
                                "index": True,
                            },
                            "ad_campaign": {
                              "type": "text",
                              # "analyzer": "ik_max_word",
                              # "search_analyzer": "ik_max_word"
                            },
                            "ad_type": {
                                "type": "keyword",
                                "index": True,
                            },
                            "consume": {
                                "type": "keyword",
                                "store": True,
                            },
                            "shows": {
                                "type": "keyword",
                                "store": True,
                            },
                            "clicks": {
                                "type": "keyword",
                                "store": True,
                            },
                            "pay_downloads": {
                                "type": "keyword",
                                "store": True,
                            },
                            "publish_time": {
                                  "type": "date",
                                  "store": True,
                                  "format": "yyyy/MM/dd HH:mm:ss||yyyy/MM/dd||epoch_millis"
                            },
                            "update_time": {
                                  "type": "date",
                                  "store": True,
                                  "format": "yyyy/MM/dd HH:mm:ss||yyyy/MM/dd||epoch_millis"
                            },
                            "ad_group": {
                                "type": "string",  # 字符串
                                "store": True
                            },
                            "ad_case": {
                                "type": "string",
                                "store": True
                            }
                        }
                    }
                }
            }
            if self.es.indices.exists(index=index_name) is not True:
                res = self.es.indices.create(index=index_name, body=_index_mappings)
                print res
    
        def bulk_Index_Data(self):
            '''
            用bulk将批量数据存储到es
            :return:
            '''
            list = FinanceStorage().get_data()
            ACTIONS = []
            i = 1
            for line in list:
                action = {
                    "_index": self.index_name,
                    "_type": self.index_type,
                    "_id": i, #_id 也可以默认生成,不赋值
                    "_source": {
                        "id": line['id'],
                        "dt": str(line['dt']).replace("-","/"),
                        "product": line['product'].encode('utf-8') ,
                        "media": line['media'].encode('utf-8') ,
                        "meida_sub_categorey": line['media_sub_category'].encode('utf-8') ,
                        "account": line['account'],
                        "ad_type": line['ad_type'].encode('utf-8'),
                        "ad_campaign": line.get('ad_campaign').encode('utf-8'),
                        "consume": line.get('consume'),
                        "shows": line.get('shows'),
                        "clicks": line.get('clicks'),
                        "publish_time": str(line['publish_time']).replace("-","/"),
                        "update_time": str(line['update_time']).replace("-","/"),
                        "ad_case": line['ad_case'].encode('utf-8'),
                        "ad_group": line['ad_group'].encode('utf-8')}
                }
                i += 1
                print action
    
                # 批量处理
                ACTIONS.append(action)
                success, _ = bulk(self.es, ACTIONS, index=self.index_name, raise_on_error=True)
                print('Performed %d actions' % success)
    
    if __name__ == '__main__':
        obj = ElasticObj("data", "media_consume_table")
        obj.bulk_Index_Data()
        # obj.create_index()
        # FinanceStorage().get_data()
    
    

    结果:


    image.png

    相关文章

      网友评论

          本文标题:python读取MySQL插入ES

          本文链接:https://www.haomeiwen.com/subject/hhuatctx.html