美文网首页
python实现mysql数据同步到elasticsearch

python实现mysql数据同步到elasticsearch

作者: 宁静消失何如 | 来源:发表于2018-04-20 10:03 被阅读147次

    环境:
    python3.5
    支持包:
    pymysql
    elasticsearch_dsl

    安装 pymysql elasticsearch_dsl

    pip install elasticsearch_dsl
    pip pymysql
    

    代码实现

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    # @Time    : 2018/4/18 下午6:30
    # @Author  : lizhao
    # @File    : mysql_data_to_elasticsearch.py
    # @Version : 1.0
    # 说明: 将mysql上的数据按规则放入elasticsearch中
    
    # 引入es_type包
    from tools.es_types import ZukerType
    from w3lib.html import remove_tags
    # 引入处理mysql的程序包
    import pymysql
    
    ##############
    # 数据库参数
    zukerDB_ip = '********'
    zukerDB_db = '********'
    zukerDB_user = '********'
    zukerDB_pw = '********' 
    zukerDB_******** = '********'  #表名 
    
    
    ##############
    class MysqlMesToElastic():
    
        def __init__(self):
            pass
    
        # 获取数据库数据
        def get_mysql_data(self):
            id = 1
            db_zuker = pymysql.connect(zukerDB_ip, zukerDB_user, zukerDB_pw,
                                       zukerDB_db,charset="utf8",cursorclass = pymysql.cursors.DictCursor)
            cursor = db_zuker.cursor()
            # 取出最后一条数据
            SQL_get_mes = "select * from 58house_info order by id desc limit 1;"
            cursor.execute(SQL_get_mes)
            last_id = cursor.fetchone()['id']
            print(last_id)
            while id <= int(last_id):
            # while id <= 5:
                dict_mes = {}
                # try:
                SQL_get_mes = "select * from %s WHERE id = %s;" % (zukerDB_58house_info,id)
                # SQL_get_mes = "select * from 58house_info WHERE id = 5;"
                cursor.execute(SQL_get_mes)
                results = cursor.fetchone()
                # 如果有元素则分析元素
                if results:
                    dict_mes = {
                        'title': results['name'],
                        'price': results['price'],
                        'create_date': results['sendTime'],
                        'desc': '   '.join([results['leasingMethod'],
                                      results['tags'],results['houseType'],results['company']]),
                        'area': results['address'],
                        'longitude': results['longitude'],
                        'latitude': results['latitude'],
                        'url':results['url']
                    }
                id += 1
                if dict_mes:
                    # 调用 process_item方法 向数据库中插数据
                    self.process_item(dict_mes)
    
    
    
        # item = get_mysql_data()
        # 将数据写入到ES中
        def process_item(self,item):
            print(item)
            zuker = ZukerType()
            zuker.title = item['title']  # 'title': '名称',
            zuker.price = item['price']  # 'price': '价格',
            zuker.create_date =  item['create_date'] # 'create_date': '时间',
            zuker.desc = item['desc']  # 'desc': '介绍',
            zuker.area = item['area']  # 'area': '位置',
            zuker.longitude = item['longitude']  # 'longitude': '经度',
            zuker.latitude = item['latitude']  # 'latitude': '维度',
            zuker.url = item['url']  # 'url': 'url',
    
            # 保存
            try:
                zuker.save()
            except:
                pass
    
    if __name__ == "__main__":
        item = MysqlMesToElastic()
        item.get_mysql_data()
    
    

    相关文章

      网友评论

          本文标题:python实现mysql数据同步到elasticsearch

          本文链接:https://www.haomeiwen.com/subject/rmcakftx.html