要用python操作Cassandra,首先需要安装Cassandra的驱动模块(cassandra-driver),可以通过pip安装。
1. 基本操作
- 连接Cassandra
from cassandra.cluster import Cluster #引入Cluster模块
cluster = Cluster() #连接本地数据库,如果是本地地址,写不写都可以
cluster = Cluster(['127.0.0.1'])#连接本地数据库
- 创建了Cluster后,并不会自动连接上数据库,需要我们执行连接操作。
session = cluster.connect()#简单的连接
session = cluster.connect('keyspacename')#指定连接keyspace,相当于sql中use dbname
session.set_keyspace('otherkeyspace') #设置、修改keyspace
session.execute('use keyspacename')#设置、修改keyspace
- 查询:查询操作使用execute(),将cql语句拼接作为参数传入即可。
rows = session.execute('select * from emp')
for row in rows:#遍历查询的结果
print(str(row[0])+row[1]+row[2]+row[3]+str(row[4])) #如果你的row[0] 不是varchar 或者text类型,需要转一下类型,不然python会报错
for (emp_id,emp_city,emp_email,emp_name,emp_phone) in rows:#也可以用这种方式遍历查询的结果
print(str(emp_id)+emp_city+emp_email+emp_name+str(emp_phone))
- 传参查询
(1)位置传参:
session.execute(
"""
INSERT INTO emp (emp_id,emp_city,emp_email,emp_name,emp_phone)
VALUES (%s, %s, %s, %s, %s)
""",
(4, 'tianjin', '156.com','pon',145645)
)
session.execute("INSERT INTO emp (emp_id) VALUES (%s)", (5,)) #如果只传一个参数,用tuple的形式必须后面加“,”,或者用list的形式
session.execute("INSERT INTO emp (emp_id) VALUES (%s)", [6])
(2)名字传参
通常用这种方式传递数据,像keyspace名、表名、列名必须在开始就设定好。
session.execute(
"""
INSERT INTO emp (emp_id,emp_city,emp_email,emp_name,emp_phone)
VALUES (%(emp_id)s, %(emp_city)s, %(emp_email)s, %(emp_name)s, %(emp_phone)s)
""",{'emp_id': 7, 'emp_city': 'xian', 'emp_email': '777777.qq.com', 'emp_name': 'xiaoming', 'emp_phone': 55555})
- 关闭连接
cluster.shutdown()
cluster.is_shutdown #查看是否关闭连接
Out[5]: True
2. 批量插入数据
如果只是一条一条插入,会非常慢,我试了下,5万条数据大概需要4分钟,如果用batch批量插入数据,就可以非常快了,差不多1s插入1万条,是不是很爽,哈哈,赶紧试一下吧~
tic = time.time()
i=0
sql = 'BEGIN BATCH\n'
with open(r'C:\Users\admin\Desktop\output\cassandra116w.csv', 'r') as f:
while True:
line = f.readline().strip()
if (line == '' or line == np.nan):
if(sql != 'BEGIN BATCH\n'):
sql += 'APPLY BATCH;'
session.execute(sql)
break
ll = line.split(',')
sql += 'INSERT INTO lead2(name,current_title,current_company,location,id) VALUES (' + '\''+ll[0]+'\'' + ','+'\''+ll[1]+'\''+',' +'\''+ ll[2] +'\''+ ',' +'\''+ ll[3]+'\'' +',' +'\''+ ll[4] +'\''+');\n'
i=i+1
if (i>300):
sql += 'APPLY BATCH;'
session.execute(sql)
i=0
sql = 'BEGIN BATCH\n'
toc = time.time()
print('vectorized version:' +str((toc - tic)) +'s')
vectorized version:116.4514513015747s #插入116万条数据,用时116秒
网友评论