美文网首页
使用psycopg2进行批量更新数据库及批量插入

使用psycopg2进行批量更新数据库及批量插入

作者: barriers | 来源:发表于2019-09-30 22:38 被阅读0次

    1处理数据

        def update():
            # 获取太阳光照
            local_data = pd.read_csv('./data.csv')
            # 调用数据库工具类生成对象
            db = DbHandle()
            engine = create_engine("postgresql+psycopg2://username:password@host:port/database")
            data_range = [datetime.datetime.strptime('2019-04-25 08:00:00', '%Y-%m-%d %H:%M:%S'), datetime.datetime.strptime('2019-09-27 00:00:00', '%Y-%m-%d %H:%M:%S')]
            data_day_delta = (data_range[1]-data_range[0]).days
            for delta_day in range(data_day_delta + 1):
                data_add_delta_day = data_range[0] + datetime.timedelta(days=delta_day)
                for hour in range(24):
                    data_add_delta_day_hour = data_add_delta_day + datetime.timedelta(hours=hour)
                    data_add_delta_day_hour = str(data_add_delta_day_hour)
                    print(f'更新第{data_add_delta_day_hour}天数据')
                    sql = f"select id, grid_id, data, to_char(published_at, 'YYYY-MM-DD HH24:MI:SS') as published_at from grid_weather where published_at='{data_add_delta_day_hour}'"
                    # 获取空气质量
                    db_data = pd.read_sql(sql, engine)
                    # 如果dataframe不为空
                    if not db_data.empty:
                        db_data = pd.merge(db_data, local_data, left_on=['grid_id', 'published_at'], right_on=['id', 'date'])
                        if not db_data.empty:
                            # 取特定列
                            db_data = db_data[['grid_id', 'data', 'published_at', 'ssra', 'id_x']]
                            print(db_data)
                            # 将dataframe转换为numpy格式(结果为列表套列表)
                            db_data = db_data.to_numpy()
                            insert_data = []
                            for data in db_data:
                                # 如果data中的第二个数据中的字典里有solar_radiation字段,则不作任何处理,否则将特定值赋值给该字典中的solar_radiation字段
                                if 'solar_radiation' in data[1]:
                                    pass
                                else:
                                    data[1]['solar_radiation'] = data[3]
                                   # data = (json.dumps(data[1]), data[0], data[2])
                                    data = (json.dumps(data[1]), data[4])
                                    # 将数据经过处理后加入一个元组,一个元组就是一条更新信息,然后加入一个列表(此处列表中元组个数为5000个)
                                    insert_data.append(data)
                            print(insert_data)
                            if insert_data:
                                # 对5000个元组组成的列表调用对象方法进行批量更新
                                db.update_db(insert_data)
    

    2批量更新对象方法

    import psycopg2
    import psycopg2.extras
    
    class DbHandle:
        def __init__(self):
            self.link_pgsql = {
                   'database': 'test',
                   'user': 'username',
                   'password': 'password',
                   'host': '127.0.0.1',
                   'port': 5432
              }
    
            self.link = psycopg2.connect(**self.link_pgsql)
            self.corsur = self.link.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
    
        def update_db(self, data, name):
                # sql = 'update grid_weather set data=new_data.data::json from (values %s) as new_data (data, grid_id, published_at_time) where grid_weather.grid_id=new_data.grid_id and published_at=published_at_time;'
            # 批量更新sql,将传入的列表当作一个临时表new_data,字段为data, id,要和元组的顺序对应,注意传入的若有字符串形式的字典要在字段后接::json将其转换为json格式
            columns = data.columns
            sql = f"""update {name} set data=new_data.data::json from (values %s) as new_data ({','.join(columns)}) where {name}.id=new_data.id;"""
            try:
                print(data)
                print(len(data))
                data = data.to_numpy()
                # 批量更新用execute_values进行,传入游标,sql,数据,和最大数据长度
                psycopg2.extras.execute_values(self.corsur, sql, data, page_size=5000)
                self.link.commit()
                print('更新成功')
                return True
            except Exception as e:
                print(e)
                print('更新失败')
                return False
    

    批量更新要点:
    1.先组装数据数组;
    2.写批量更新sql(将传入的数组当作临时表,数组中的元素当作临时表中的字段,进行条件判断更新),数组中的元素一般为元组,元组中的字典元素需转换为json格式(py中其形式为字符串),然后在更新时要在字符串形式的json字段后面接::json将其转换为数据库中的json格式,否则会报错,正确形式如下图。
    sql = 'update grid_weather set data=new_data.data::json from (values %s) as new_data (data, grid_id, published_at_time) where grid_weather.grid_id=new_data.grid_id and published_at=published_at_time;'此处条件判断时最好用唯一键进行判断,慎用时间相关的字段进行判断,否则有可能会因为两个时间格式不同而无法更新。

    3批量插入的三种方法

    本方法都是将pandas的dataframe的值批量插入数据库

    import psycopg2
    import psycopg2.extras
    
    class DbHandle:
        def __init__(self):
            self.link_pgsql = {
                        'database': 'test',
                        'user': 'spider',
                        'password': '123456',
                        'host': '127.0.0.1',
                        'port': 5432
                    }
            self.link = psycopg2.connect(**self.link_pgsql)
            self.corsur = self.link.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
        def update_db(self, data, name):
            """批量更新"""
            # sql = 'update grid_weather set data=new_data.data::json from (values %s) as new_data (data, grid_id, published_at_time) where grid_weather.grid_id=new_data.grid_id and published_at=published_at_time;'
            columns = data.columns
            sql = f"""update {name} set data=new_data.data::json from (values %s) as new_data ({','.join(columns)}) where {name}.id=new_data.id;"""
            try:
                print(data)
                print(len(data))
                data = data.to_numpy()
                psycopg2.extras.execute_values(self.corsur, sql, data, page_size=4900)
                self.link.commit()
                print('更新成功')
                return True
            except Exception as e:
                print(e)
                print('更新失败')
                return False
    
        def insert_lots_of_by_many(self, df, name):
            """简单实用,属于游标的对象方法"""
            # sql = f'insert into {name}(grid_id, data, published_at) values (%s, %s, %s);'
            columns = data.columns
            sql = f"""insert into {name}({','.join(columns)}) values ({','.join(['%s'] * len(columns))});"""
            print(sql)
            data = df.to_numpy()
            print(data)
            self.corsur.executemany(sql, data)
            self.link.commit()
    
        def insert_lots_of_by_values(self, data, name):
            """官方推荐,要批量操作的字段的值必须相同"""
            columns = data.columns
            sql = f'insert into {name}({",".join(columns)}) values %s;'
            print(sql)
            try:
                data = data.to_numpy()
                print(data)
                print(len(data))
                psycopg2.extras.execute_values(self.corsur, sql, data, page_size=4900)
                self.link.commit()
                print('更新成功')
                return True
            except Exception as e:
                print(e)
                print('更新失败')
                return False
    
        def insert_lots_of_by_batch(self, data, name):
            """性能好,速度快,属于类方法"""
            # sql = f"""insert into {name}(grid_id, data, published_at) values (%s, %s, %s);"""
            columns = data.columns
            sql = f"""insert into {name}({','.join(columns)}) values ({','.join(['%s']*len(columns))});"""
            print(sql)
            try:
                data = data.to_numpy()
                psycopg2.extras.execute_batch(self.corsur, sql, data, page_size=4900)
                self.link.commit()
                print('更新成功')
                return True
            except Exception as e:
                print(e)
                print('更新失败')
                return False
    

    相关文章

      网友评论

          本文标题:使用psycopg2进行批量更新数据库及批量插入

          本文链接:https://www.haomeiwen.com/subject/vcihpctx.html