美文网首页elasticsearch
Python从mysql导入数据到elasticsearch

Python从mysql导入数据到elasticsearch

作者: idealfeng | 来源:发表于2016-11-17 11:39 被阅读276次

    1:安装python elasticsearch包,pip install elasticsearch

    2:安装MySQLdb

    1)        从https://pypi.python.org/pypi/MySQL-python下载MySQLdb安装包

    2)        解压下载的压缩包

    3)        进入MySQLdb安装包目录

    4)        python setup.py build

    5)        python setup.py install

    3:导入数据以case_fix_attrs表为例导入fixnum条数据。

    #!/usr/bin/env python

    # -*- coding: utf-8 -*-

    import MySQLdb

    import os

    import hashlib

    from elasticsearch import Elasticsearch

    import elasticsearch.helpers

    #DB parameter

    strHost = 'xxx.xxx.xxx.xxx'

    strDB = 'database'

    strUser = 'user'

    strPasswd = 'password'

    table = 'table'

    fixnum = 1000000

    def seEncode(ustr, encoding='utf-8'):

    '''''负责把入数据库的字符串,转化成utf-8编码'''

    if ustr is None:

    return ''

    if isinstance(ustr, unicode):

    return ustr.encode(encoding, 'ignore')

    else:

    return str(ustr)

    #connect to DB

    def getConnect(db=strDB, host=strHost, user=strUser, passwd=strPasswd, charset="utf8"):

    return MySQLdb.connect(host=strHost, db=strDB, user=strUser, passwd=strPasswd, charset="utf8")

    def initClientEncode(conn):

    '''''mysql client encoding=utf8'''

    curs = conn.cursor()

    curs.execute("SET NAMES utf8")

    conn.commit()

    return curs

    class MySQLQueryPagination(object):

    def __init__(self,conn,numPerPage = 50000):

    self.conn = conn

    self.numPerPage = numPerPage

    def queryForList(self,sql):

    totalPageNum = self.__calTotalPages(sql)

    #print totalPageNum

    for pageIndex in range(totalPageNum):

    yield self.__queryEachPage(sql,pageIndex)

    def __createPaginaionQuerySql(self,sql,currentPageIndex):

    startIndex = self.__calStartIndex(currentPageIndex)

    qSql  = r'select * from %s limit %s,%s' % (table,startIndex,self.numPerPage)

    #print qSql

    return qSql

    def __queryEachPage(self,sql,currentPageIndex):

    curs = initClientEncode(self.conn)

    qSql = self.__createPaginaionQuerySql(sql, currentPageIndex)

    curs.execute(qSql)

    result = curs.fetchall()

    curs.close()

    return result

    def __calStartIndex(self,currentPageIndex):

    startIndex = currentPageIndex  * self.numPerPage;

    return startIndex;

    def __calTotalRowsNum(self,sql):

    ''''' 计算总行数 '''

    tSql = r'select count(*) from ' + table

    curs = initClientEncode(self.conn)

    curs.execute(tSql)

    result = curs.fetchone()

    curs.close()

    totalRowsNum = 0

    if result != None:

    totalRowsNum = int(result[0])

    return totalRowsNum

    def __calTotalPages(self,sql):

    ''''' 计算总页数 '''

    totalRowsNum = 0

    if fixnum > 0:

    totalRowsNum = fixnum

    else:

    totalRowsNum = self.__calTotalRowsNum(sql)

    totalPages = 0;

    if (totalRowsNum % self.numPerPage) == 0:

    totalPages = totalRowsNum / self.numPerPage;

    else:

    totalPages = (totalRowsNum / self.numPerPage) + 1

    return totalPages

    def __calLastIndex(self, totalRows, totalPages,currentPageIndex):

    '''''计算结束时候的索引'''

    lastIndex = 0;

    if totalRows < self.numPerPage:

    lastIndex = totalRows;

    elif ((totalRows % self.numPerPage == 0)

    or (totalRows % self.numPerPage != 0 and currentPageIndex < totalPages)) :

    lastIndex = currentPageIndex * self.numPerPage

    elif (totalRows % self.numPerPage != 0 and currentPageIndex == totalPages): # 最后一页

    lastIndex = totalRows

    return lastIndex

    def _parse_serialize_case_fix_attrs(self, row):

    '''''解析序列化mysql case_fix_attrs表数据'''''

    action= {

    '_op_type': 'index',

    '_index': strDB,

    '_type': table,

    '_source': {

    "id" : row[0],

    "title" : row[1],

    "no" : row[2],

    "remark" : row[3]

    }

    }

    return action

    if __name__ == '__main__':

    conn = getConnect()

    pag = MySQLQueryPagination(conn)

    sql = r'SELECT * FROM `case_fix_attrs` WHERE id<%s'

    #ES

    es = Elasticsearch([{'host':'xxx.xxx.xxx.xxx','port':9200}])

    for ret in pag.queryForList(sql):

    actions = []

    for row in ret:

    action = pag._parse_serialize_case_fix_attrs(row)

    actions.append(action)

    elasticsearch.helpers.bulk( es, actions )

    del actions[0:len(actions)]

    conn.close()

    相关文章

      网友评论

        本文标题:Python从mysql导入数据到elasticsearch

        本文链接:https://www.haomeiwen.com/subject/kpedpttx.html