Day13 SQLAlchemy连表操作和堡垒机

时间:2023-03-08 19:33:56
Day13 SQLAlchemy连表操作和堡垒机

一、数据库操作

1、创建表、插入数据和一对多查询

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Author: wanghuafeng from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine engine = create_engine("mysql+pymysql://root:123456@192.168.100.188:3306/s13", max_overflow =5)
Base = declarative_base() #单表
class Test(Base):
__tablename__ = 'test'
nid = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(32)) #一对多
class Group(Base):
__tablename__ = 'group'
nid = Column(Integer, primary_key=True, autoincrement=True)
caption = Column(String(32)) class User(Base):
__tablename__ = 'user'
nid = Column(Integer, primary_key=True, autoincrement=True)
username = Column(String(32))
#外键
group_id = Column(Integer, ForeignKey("group.nid"))
# 只用于查询
#uuu代表虚拟列:[1 wang, 3 xiaoming, 4 xiaoxiao]
#relationship与ForeignKey一般在一起
group = relationship("Group", backref='uuu') #只是对print对象的时候有用
def __repr__(self):
#return "<nid=%s, username=%s, group_id=%s>" % (self.nid, self.username, self.group_id)
temp = "%s - %s - %s" % (self.nid, self.username, self.group_id)
return temp #创建表
def init_db():
Base.metadata.create_all(engine) def drop_db():
Base.metadata.drop_all(engine) init_db()
#创建组
Session = sessionmaker(bind=engine)
session = Session()
session.add(Group(caption = 'dba'))
session.add(Group(caption = 'dbd'))
session.commit() #只获取用户
ret = session.query(User).filter(User.username=='wang').all()
print(ret) ret = session.query(User).all()
obj = ret[0]
print(obj)
print(obj.nid)
print(obj.username)
print(obj.group_id)
ret = session.query(User.username).all()
print(ret) #左连接isouter=True
#同时取两个表session.query(User, Group)
sql = session.query(User, Group).join(Group, isouter=True)
sql = session.query(User.username, Group.caption).join(Group, isouter=True)
print(sql)
ret = session.query(User.username, Group.caption).join(Group, isouter=True).all()
#select * from user left join group on user.group_id = group.nid
print(ret) #新方式(正向查询):relationship在这个表里并查询该表的数据
ret = session.query(User).all()
for obj in ret:
# obj.group:obj代表user表的每一行数据
# obj.group为group对象
print(obj.nid, obj.username, obj.group_id, obj.group, obj.group.nid, obj.group.caption) #列出组中的所有人
# ret = session.query(User.username, Group.caption).join(Group, isouter=True).filter(Group.caption == 'dba').all()
# print(ret)
#新方式(反向查询):relationship不在这个表里并查询其他表的数据
obj = session.query(Group).filter(Group.caption == 'dba').first()
print(obj.nid, obj.caption)
print(obj.uuu)

2、多对多关联

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Author: wanghuafeng from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine engine = create_engine("mysql+pymysql://root:123456@192.168.100.188:3306/s13", max_overflow =5)
Base = declarative_base() #多对多
class Host(Base):
__tablename__ = 'host'
nid = Column(Integer, primary_key=True, autoincrement=True)
hostname = Column(String(32))
port = Column(String(32))
ip = Column(String(32)) class HostUser(Base):
__tablename__ = 'host_user'
nid = Column(Integer, primary_key=True, autoincrement=True)
username = Column(String(32)) class HostToHostUser(Base):
__tablename__ = 'host_to_hostuser'
#增加nid,方便以后删除
nid = Column(Integer, primary_key=True, autoincrement=True)
host_id = Column(Integer, ForeignKey('host.nid'))
host_user_id = Column(Integer, ForeignKey('host_user.nid')) #创建表
def init_db():
Base.metadata.create_all(engine) def drop_db():
Base.metadata.drop_all(engine) init_db() Session = sessionmaker(bind=engine)
session = Session() #循环插入数据
session.add_all([
Host(hostname='c1', port='22', ip='1.2.1.2'),
Host(hostname='c2', port='22', ip='1.2.1.3'),
Host(hostname='c3', port='22', ip='1.2.1.1'),
Host(hostname='c4', port='22', ip='1.2.1.4'),
Host(hostname='c5', port='22', ip='1.2.1.5'),
])
session.commit() session.add_all([
HostUser(username='root'),
HostUser(username='mysql'),
HostUser(username='svn'),
HostUser(username='git'),
HostUser(username='oracle'),
])
session.commit() session.add_all([
HostToHostUser(host_id='1', host_user_id=1),
HostToHostUser(host_id='1', host_user_id=2),
HostToHostUser(host_id='1', host_user_id=3),
HostToHostUser(host_id='2', host_user_id=4),
HostToHostUser(host_id='2', host_user_id=5),
HostToHostUser(host_id='2', host_user_id=1),
HostToHostUser(host_id='3', host_user_id=1),
HostToHostUser(host_id='3', host_user_id=2),
HostToHostUser(host_id='3', host_user_id=3),
HostToHostUser(host_id='4', host_user_id=4),
HostToHostUser(host_id='4', host_user_id=5),
HostToHostUser(host_id='4', host_user_id=1),
HostToHostUser(host_id='5', host_user_id=4),
HostToHostUser(host_id='5', host_user_id=5),
HostToHostUser(host_id='5', host_user_id=1),
])
session.commit() #多对多操作数据 #获取主机1中的所有用户
host_obj = session.query(Host).filter(Host.hostname == 'c1').first()
#host_obj.nid(找到主机id) host_2_host_user = session.query(HostToHostUser.host_user_id).filter(HostToHostUser.host_id == host_obj.nid).all()
print(host_2_host_user)
#[(1,), (2,), (3,)]
r = zip(*host_2_host_user)
#print(list(r)[0]) #[1, 2, 3]
users = session.query(HostUser.username).filter(HostUser.nid.in_(list(r)[0])).all()
print(users)

3、多对多查询第一种方式

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Author: wanghuafeng from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine engine = create_engine("mysql+pymysql://root:123456@192.168.100.188:3306/s13", max_overflow =5)
Base = declarative_base() #多对多
class Host(Base):
__tablename__ = 'host'
nid = Column(Integer, primary_key=True, autoincrement=True)
hostname = Column(String(32))
port = Column(String(32))
ip = Column(String(32)) class HostUser(Base):
__tablename__ = 'host_user'
nid = Column(Integer, primary_key=True, autoincrement=True)
username = Column(String(32)) class HostToHostUser(Base):
__tablename__ = 'host_to_hostuser'
#增加nid,方便以后删除
nid = Column(Integer, primary_key=True, autoincrement=True)
host_id = Column(Integer, ForeignKey('host.nid'))
host_user_id = Column(Integer, ForeignKey('host_user.nid')) host = relationship("Host", backref='h')
host_user = relationship("HostUser", backref='u') Session = sessionmaker(bind=engine)
session = Session() #获取主机1中的所有用户
#主机
host_obj = session.query(Host).filter(Host.hostname=='c1').first()
#host_to_hostuser表中的对象
#print(host_obj.h) for item in host_obj.h:
print(item.host_user, item.host_user.nid, item.host_user.username)

4、多对多查询第二种方式

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Author: wanghuafeng from sqlalchemy import create_engine, and_, or_, func, Table
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, DateTime
from sqlalchemy.orm import sessionmaker, relationship engine = create_engine("mysql+pymysql://root:2008bjAY@192.168.145.130:3306/s13", max_overflow =5)
Base = declarative_base() #多对多
HostToHostUser = Table('host_to_hostuser', Base.metadata,
Column('host_id',ForeignKey('host.nid'), primary_key=True),
Column('host_user_id', ForeignKey('host_user.nid'), primary_key=True),
) class Host(Base):
__tablename__ = 'host'
nid = Column(Integer, primary_key=True, autoincrement=True)
hostname = Column(String(32))
port = Column(String(32))
ip = Column(String(32)) host_user = relationship('HostUser',
secondary=HostToHostUser,
backref='h') class HostUser(Base):
__tablename__ = 'host_user'
nid = Column(Integer, primary_key=True, autoincrement=True)
username = Column(String(32)) Session = sessionmaker(bind=engine)
session = Session() #主机
host_obj = session.query(Host).filter(Host.hostname=='c1').first()
for item in host_obj.host_user:
print(item.username)

5、多对多查询第三种方式

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Author: wanghuafeng from sqlalchemy import create_engine, and_, or_, func, Table
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, DateTime
from sqlalchemy.orm import sessionmaker, relationship engine = create_engine("mysql+pymysql://root:2008bjAY@192.168.145.130:3306/s13", max_overflow =5)
Base = declarative_base() #多对多
class Host(Base):
__tablename__ = 'host'
nid = Column(Integer, primary_key=True, autoincrement=True)
hostname = Column(String(32))
port = Column(String(32))
ip = Column(String(32)) host_user = relationship("HostUser", secondary = HostToHostUser.__table__,backref='u') class HostUser(Base):
__tablename__ = 'host_user'
nid = Column(Integer, primary_key=True, autoincrement=True)
username = Column(String(32)) class HostToHostUser(Base):
__tablename__ = 'host_to_hostuser'
#增加nid,方便以后删除
nid = Column(Integer, primary_key=True, autoincrement=True)
host_id = Column(Integer, ForeignKey('host.nid'))
host_user_id = Column(Integer, ForeignKey('host_user.nid')) Session = sessionmaker(bind=engine)
session = Session() #主机
host_obj = session.query(Host).filter(Host.hostname=='c1').first()
for item in host_obj.host_user:
print(item.username)

二、Paramiko

1、使用内置的transport连接

import paramiko
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(hostname='192.168.145.130', port=22, username='scm', password='1q2w3e4R')
stdin, stdout, stderr = ssh.exec_command('ls')
result = stdout.read()
print(result)

2、自定义transport

import paramiko
transport = paramiko.Transport(('192.168.145.130', 22))
transport.connect(username='scm', password='1q2w3e4R')
ssh = paramiko.SSHClient()
ssh._transport = transport stdin, stdout, stderr = ssh.exec_command('df')
result = stdout.read()
print(result)
transport.close()

3、SFTP

import paramiko

transport = paramiko.Transport(('192.168.145.130', 22))
transport.connect(username='scm', password='1q2w3e4R')
sftp = paramiko.SFTPClient.from_transport(transport)
#上传
sftp.put('D:\\Study\\cnblog.txt', '/tmp/cn.txt')
#下载
sftp.get('remote_path', 'local_path')
transport.close()

三、堡垒机实现

1、简单实例

注意:需要配置如下信息

vim .bashrc
/usr/bin/python3 bridge_server.py
logout
bridge_server.py代码如下:
import paramiko
import sys
import os
import socket
import getpass from paramiko.py3compat import u # windows does not have termios...
try:
import termios
import tty
has_termios = True
except ImportError:
has_termios = False def interactive_shell(chan):
if has_termios:
posix_shell(chan)
else:
windows_shell(chan) def posix_shell(chan):
import select oldtty = termios.tcgetattr(sys.stdin)
try:
tty.setraw(sys.stdin.fileno())
tty.setcbreak(sys.stdin.fileno())
chan.settimeout(0.0)
log = open('handle.log', 'a+', encoding='utf-8')
flag = False
temp_list = []
while True:
r, w, e = select.select([chan, sys.stdin], [], [])
if chan in r:
try:
x = u(chan.recv(1024))
if len(x) == 0:
sys.stdout.write('\r\n*** EOF\r\n')
break
if flag:
if x.startswith('\r\n'):
pass
else:
temp_list.append(x)
flag = False
sys.stdout.write(x)
sys.stdout.flush()
except socket.timeout:
pass
if sys.stdin in r:
x = sys.stdin.read(1)
import json if len(x) == 0:
break if x == '\t':
flag = True
else:
temp_list.append(x)
if x == '\r':
log.write(''.join(temp_list))
log.flush()
temp_list.clear()
chan.send(x) finally:
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, oldtty) def windows_shell(chan):
import threading sys.stdout.write("Line-buffered terminal emulation. Press F6 or ^Z to send EOF.\r\n\r\n") def writeall(sock):
while True:
data = sock.recv(256)
if not data:
sys.stdout.write('\r\n*** EOF ***\r\n\r\n')
sys.stdout.flush()
break
sys.stdout.write(data)
sys.stdout.flush() writer = threading.Thread(target=writeall, args=(chan,))
writer.start() try:
while True:
d = sys.stdin.read(1)
if not d:
break
chan.send(d)
except EOFError:
# user hit ^Z or F6
pass def run():
default_username = getpass.getuser()
username = input('Username [%s]: ' % default_username)
if len(username) == 0:
username = default_username hostname = input('Hostname: ')
if len(hostname) == 0:
print('*** Hostname required.')
sys.exit(1) tran = paramiko.Transport((hostname, 22,))
tran.start_client() default_auth = "p"
auth = input('Auth by (p)assword or (r)sa key[%s] ' % default_auth)
if len(auth) == 0:
auth = default_auth if auth == 'r':
default_path = os.path.join(os.environ['HOME'], '.ssh', 'id_rsa')
path = input('RSA key [%s]: ' % default_path)
if len(path) == 0:
path = default_path
try:
key = paramiko.RSAKey.from_private_key_file(path)
except paramiko.PasswordRequiredException:
password = getpass.getpass('RSA key password: ')
key = paramiko.RSAKey.from_private_key_file(path, password)
tran.auth_publickey(username, key)
else:
pw = getpass.getpass('Password for %s@%s: ' % (username, hostname))
tran.auth_password(username, pw) # 打开一个通道
chan = tran.open_session()
# 获取一个终端
chan.get_pty()
# 激活器
chan.invoke_shell() interactive_shell(chan) chan.close()
tran.close() if __name__ == '__main__':
run()