美文网首页大数据 爬虫Python AI SqlPython,web开发,前端技术分享码农的世界
大规模异步新闻爬虫【3】:让MySQL数据库操作更方便

大规模异步新闻爬虫【3】:让MySQL数据库操作更方便

作者: 一墨编程学习 | 来源:发表于2019-05-05 22:31 被阅读3次

    现如今,我们能用的数据库很多,老牌关系型数据库如MySQL(MariaDB),PostgreSQL等,新型的NoSQL数据库,还有NewSqL数据库。选择实在太多,但MySQL(Mariadb)从易获取性,易使用性,稳定性,社区活跃性方面都有较大优势,所以,我们在够用的情况下都选择MySQL的。

    今天,我们就把MySQL的的操作单独拿出来探讨一下,并实现一个更方便的封装。

    Python的对MySQL的操作的模块最好的两个模块是:

    1. MySQLdb
    这是一个老牌的MySQL模块,它封装了MySQL客户端的C语言API,但是它主要支持Python 2.x的版本,后来有人叉了一个版本加入了Python 3的支持,并起名为mysqlclient -python它的pypi包名为mysqlclient,所以通过pip安装就是pip install mysqlclient

    2. PyMySQL
    这是一个纯Python实现的MySQL客户端。因为是纯Python实现,它和Python 3的异步模块aysncio可以很好的结合起来,形成了aiomysql模块,后面我们写异步爬虫时就可以对数据库进行异步操作了。

    通过以上简单的对比,我们选择了PyMySQL来作为我们的数据库客户端模块。

    我在Python中操作MySQL的时间已经有很年了,总结下来,还是龙卷风里面的那个torndb的封装使用比较方便.torndb在Python 2.x时代早就出现了,那时候它是对MySQLdb的封装。后来接触Python 3和PyMySQL,就自己参考torndb和自己的经验,对PyMySQL进行了一个封装,并给它起了个很土的名字:ezpymysql

    不过,这个很土的名字背后,还是有着让人省心的方便,希望小猿们能看在它好用的份儿上,别计较它很土的名字。

    废话不多讲,代码接着上!

    1.使用示例

    首先我们先通过一个使用例子看看它的方便性:

    from ezpymysql import Connection
    
    db = Connection(
        'localhost',
        'db_name',
        'user',
        'password'
    )
    # 获取一条记录
    sql = 'select * from test_table where id=%s'
    data = db.get(sql, 2)
    
    # 获取多天记录
    sql = 'select * from test_table where id>%s'
    data = db.query(sql, 2)
    
    # 插入一条数据
    sql = 'insert into test_table(title, url) values(%s, %s)'
    last_id = db.execute(sql, 'test', 'http://a.com/')
    # 或者
    last_id = db.insert(sql, 'test', 'http://a.com/')
    
    # 使用更高级的方法插入一条数据
    item = {
        'title': 'test',
        'url': 'http://a.com/',
    }
    last_id = db.table_insert('test_table', item)
    
    

    它的使用分两步:
    首先,建立一个MySQL连接;
    然后,通过sql语句查询或插入数据。

    SQLAchemy之类的ORM呢?简单说,就是因为这个简单,我们的操作基本上都是查询和插入,用基本的选择,插入这些sql语句是最方便和简单的。而ORM要先对表建立映射模型,查询方法也是因ORM而不同,过度的封装很不适合爬虫应用场景。其实,老猿我在写网络应用时,仍然是自己写SQL,感觉就是那么的清爽!

    好吧,不再卖关子了,该上ezpymysql的实现了。

    2.具体实现

    #File: ezpymysql.py
    #Author: veelion
    
    """A lightweight wrapper around PyMySQL.
    only for python3
    
    """
    
    import time
    import logging
    import traceback
    import pymysql.cursors
    
    version = "0.7"
    version_info = (0, 7, 0, 0)
    
    class Connection(object):
        """A lightweight wrapper around PyMySQL.
        """
        def __init__(self, host, database, user=None, password=None,
                     port=0,
                     max_idle_time=7 * 3600, connect_timeout=10,
                     time_zone="+0:00", charset = "utf8mb4", sql_mode="TRADITIONAL"):
            self.host = host
            self.database = database
            self.max_idle_time = float(max_idle_time)
    
            args = dict(use_unicode=True, charset=charset,
                        database=database,
                        init_command=('SET time_zone = "%s"' % time_zone),
                        cursorclass=pymysql.cursors.DictCursor,
                        connect_timeout=connect_timeout, sql_mode=sql_mode)
            if user is not None:
                args["user"] = user
            if password is not None:
                args["passwd"] = password
    
            # We accept a path to a MySQL socket file or a host(:port) string
            if "/" in host:
                args["unix_socket"] = host
            else:
                self.socket = None
                pair = host.split(":")
                if len(pair) == 2:
                    args["host"] = pair[0]
                    args["port"] = int(pair[1])
                else:
                    args["host"] = host
                    args["port"] = 3306
            if port:
                args['port'] = port
    
            self._db = None
            self._db_args = args
            self._last_use_time = time.time()
            try:
                self.reconnect()
            except Exception:
                logging.error("Cannot connect to MySQL on %s", self.host,
                              exc_info=True)
    
        def _ensure_connected(self):
            # Mysql by default closes client connections that are idle for
            # 8 hours, but the client library does not report this fact until
            # you try to perform a query and it fails.  Protect against this
            # case by preemptively closing and reopening the connection
            # if it has been idle for too long (7 hours by default).
            if (self._db is None or
                (time.time() - self._last_use_time > self.max_idle_time)):
                self.reconnect()
            self._last_use_time = time.time()
    
        def _cursor(self):
            self._ensure_connected()
            return self._db.cursor()
    
        def __del__(self):
            self.close()
    
        def close(self):
            """Closes this database connection."""
            if getattr(self, "_db", None) is not None:
                self._db.close()
                self._db = None
    
        def reconnect(self):
            """Closes the existing database connection and re-opens it."""
            self.close()
            self._db = pymysql.connect(**self._db_args)
            self._db.autocommit(True)
    
        def query(self, query, *parameters, **kwparameters):
            """Returns a row list for the given query and parameters."""
            cursor = self._cursor()
            try:
                cursor.execute(query, kwparameters or parameters)
                result = cursor.fetchall()
                return result
            finally:
                cursor.close()
    
        def get(self, query, *parameters, **kwparameters):
            """Returns the (singular) row returned by the given query.
            """
            cursor = self._cursor()
            try:
                cursor.execute(query, kwparameters or parameters)
                return cursor.fetchone()
            finally:
                cursor.close()
    
        def execute(self, query, *parameters, **kwparameters):
            """Executes the given query, returning the lastrowid from the query."""
            cursor = self._cursor()
            try:
                cursor.execute(query, kwparameters or parameters)
                return cursor.lastrowid
            except Exception as e:
                if e.args[0] == 1062:
                    pass
                else:
                    traceback.print_exc()
                    raise e
            finally:
                cursor.close()
    
        insert = execute
    
        ## =============== high level method for table ===================
    
        def table_has(self, table_name, field, value):
            if isinstance(value, str):
                value = value.encode('utf8')
            sql = 'SELECT %s FROM %s WHERE %s="%s"' % (
                field,
                table_name,
                field,
                value)
            d = self.get(sql)
            return d
    
        def table_insert(self, table_name, item):
            '''item is a dict : key is mysql table field'''
            fields = list(item.keys())
            values = list(item.values())
            fieldstr = ','.join(fields)
            valstr = ','.join(['%s'] * len(item))
            for i in range(len(values)):
                if isinstance(values[i], str):
                    values[i] = values[i].encode('utf8')
            sql = 'INSERT INTO %s (%s) VALUES(%s)' % (table_name, fieldstr, valstr)
            try:
                last_id = self.execute(sql, *values)
                return last_id
            except Exception as e:
                if e.args[0] == 1062:
                    # just skip duplicated item
                    pass
                else:
                    traceback.print_exc()
                    print('sql:', sql)
                    print('item:')
                    for i in range(len(fields)):
                        vs = str(values[i])
                        if len(vs) > 300:
                            print(fields[i], ' : ', len(vs), type(values[i]))
                        else:
                            print(fields[i], ' : ', vs, type(values[i]))
                    raise e
    
        def table_update(self, table_name, updates,
                         field_where, value_where):
            '''updates is a dict of {field_update:value_update}'''
            upsets = []
            values = []
            for k, v in updates.items():
                s = '%s=%%s' % k
                upsets.append(s)
                values.append(v)
            upsets = ','.join(upsets)
            sql = 'UPDATE %s SET %s WHERE %s="%s"' % (
                table_name,
                upsets,
                field_where, value_where,
            )
            self.execute(sql, *(values))
    
    

    3.使用方法

    这个实现是对pymysql的简单封装,但提供了一些方便的操作:

    1.建立MySQL连接

    db = Connection(
        'localhost',
        'db_name',
        'user',
        'password'
    )
    
    

    一般只需要四个参数就可以建立连接了:

    • 主持人:数据库地址,本节就是本地主机
    • database:数据库名
    • user:数据库用户名
    • 密码:数据库用户的密码

    后面还有几个参数可酌情使用:

    • max_idle_time:MySQL服务器默认8小时闲置就会断开客户端的连接;这个参数告诉客户端闲置多长时间要重新连接;
    • time_zone:这里默认时区为0区,你可以设置为自己的时区,比如东8区+8:00;
    • 字符集:默认为utf8mb4,即支持门司字符的UTF8;

    操作数据库

    数据库操作分为两类:读和写。
    读操作:使用get()获取一个数据,返回的是一个dict,key就是数据库表的字段;使用query()来获取一组数据,返回的是一个列表,其中每个项目是一个dict,跟get()返回的字典一样。
    写操作:使用insert()或execute(),看源码就知道,inseret就是执的别名。

    3.高级操作

    以table_开头的方法:

    • table_has()查询某个值是否存在于表中。查询的字段最好建立的在MySQL中建立了索引,不然数据量稍大就会很慢。
    • table_insert()把一个字典类型的数据插入表中。字典的key必须是表的字段。
    • table_update()更新表中的一条记录。其中,field_where最好是建立了索引,不然数据量稍大就会很慢。

    好了,这就是我们封装的MySQL的数据库模块,通过简洁的方法来使用,加快我们今后写爬虫的速度,是写爬虫存储数据的居家必备之良器哦,还不赶紧收藏起来。

    爬虫知识点

    1. logging模块
    Python提供的输出日志的模块,可以输出到屏幕(stdout,stderr),也可以输出到文件。爬虫在运行过程中,可能会碰到千奇百怪的异常,把这些异常都记录下来,可以很好的帮助改善爬虫。

    2. pymysql
    一个纯Python实现的MySQL客户端。在使用中,我们把它封装为ezpymysql。

    相关文章

      网友评论

        本文标题:大规模异步新闻爬虫【3】:让MySQL数据库操作更方便

        本文链接:https://www.haomeiwen.com/subject/harkoqtx.html