first init project code

This commit is contained in:
dongxiaoyan
2020-04-01 12:42:05 +08:00
commit acc676857b
138 changed files with 32456 additions and 0 deletions

View File

@@ -0,0 +1,40 @@
# 由于MD5模块在python3中被移除
# 在python3中使用hashlib模块进行md5操作
import hashlib
class MD5:
def MD5(data,langer,md5_types):
# 创建md5对象
# m = hashlib.md5()
# Tips
# 此处必须encode
# 若写法为m.update(str) 报错为: Unicode-objects must be encoded before hashing
# 因为python3里默认的str是unicode
# 或者 b = bytes(str, encoding='utf-8')作用相同都是encode为bytes
# b = str.encode(encoding='utf-8')
# m.update(b)
# str_md5 = m.hexdigest()
if langer == "英文":
# str_md5 = hashlib.md5(b'this is a md5 test.').hexdigest()
str_md5 = hashlib.md5("b'"+data+"'").hexdigest()
print('MD5加密前为 ' + data)
print('MD5加密后为 ' + str_md5)
return str_md5
elif langer == "中文":
str_md5 = hashlib.md5('你好'.encode(encoding=md5_types)).hexdigest()
return str_md5
# utf8 和gbk 加密结构不一样
# hashlib.md5('你好'.encode(encoding='GBK')).hexdigest()
# hashlib.md5('你好'.encode(encoding='GB2312')).hexdigest()
# hashlib.md5('你好'.encode(encoding='GB18030')).hexdigest()
if __name__ == '__main__':
data = '小猪'
langer = '中文'
md5_types = 'GBK'
a =MD5(data,langer,md5_types)
print(a)
b=r'C:\Users\小猪\AppData\Local\Programs\Python\Python37\Lib\site-packages\custometest\MD5.py'
with open(b, encoding='utf-8') as f:
text = f.read()
print(text)

View File

@@ -0,0 +1,16 @@
#-*- coding:utf-8 -*-
'''
created by hch 2019-06-26
'''
from custometest.printlog import printlog
from custometest.MD5 import MD5
from custometest.cmd_cer import Order
# from custometest.printlog import printlog
__version__ = '1.0'
class custometest(printlog,Order,MD5):
ROBOT_LIBRARY_SCOPE = 'GLOBAL'

View File

@@ -0,0 +1,164 @@
import os
import subprocess
from time import sleep
class Order:
def CMD(self,data):
result = os.popen(data)
# res = result.read().encoding('GBK')
res = result.read()
result.close()
# res = res.decode("unicode-escape")
return res
def Linux(self):
pass
# 根据证书颁发者名字判断证书是否替换
def Cert_Verification(self,data):
c = []
print(1)
#with open(r'C:\Users\iiesoft\AppData\Local\Programs\Python\Python36\Lib\site-packages\custometest\certificate.yaml', 'r') as foo:
with open(r'certificate.yaml', 'r') as foo:
print(2)
for line in foo.readlines():
if data in line:
print(line)
c.append('证书已替换')
else:
pass
if '证书已替换' in c:
# print('证书已替换')
foo.close()
return '证书已替换'
else:
# print('证书未替换')
foo.close()
return '证书未替换'
def Content_Type(self,data):
d = []
with open('certificate.yaml', 'r') as foo:
for line in foo.readlines():
if data in line:
# print(line)
d.append('Content_Type已替换')
else:
pass
if 'Content_Type已替换' in d:
# print('证书已替换')
foo.close()
return 'Content_Type已替换'
else:
# print('证书未替换')
foo.close()
return 'Content_Type未替换'
# curl路由内容设置
def curl_name(self,data):
#curl_name = 'curl -kv -m 10 -1 --trace C:/Users/iiesoft/AppData/Local/Programs/Python/Python36/Lib/site-packages/custometest/certificate.yaml '+data+'| iconv -f utf-8 -t gbk'
curl_name = 'curl -kv -m 10 -1 --trace certificate.yaml '+data+'| iconv -f utf-8 -t gbk'
return curl_name
# 控制器
def manu(self,url,Certificate):
# print(data['url'])
n = 0
while n != len(url):
b = self.curl_name(url[n])
d = self.CMD(b)
# print(d)
sleep(1)
if Certificate != "":
c =self.Cert_Verification(Certificate)
# f = self.Content_Type(data["Content_Type"])
sleep(1)
assert_cer = url[n]+c
# assert_Content_Type = data['Content_Type']+f
n+=1
return d,assert_cer
def FTP(self, ftp_type):
windows_path = os.getcwd()
linux_path = os.getcwd().replace('\\', '/')
# 判断FTP执行类型下载/登录)
if ftp_type == "下载":
# 调用cmd执行FTP下载文件
data = 'curl -m 20 ftp://202.38.97.230/pub/iso/linux/knoppix/KNOPPIX_V7.7.1DVD-2016-10-22-EN/dpkg-l-dvd-771.txt -u"anonymous:chrome@example.com" -o zmmtext123.txt'
d = self.CMD(data)
sleep(5)
fsize = os.path.getsize(linux_path + "/zmmtext123.txt") # 435814
if fsize == 435814:
return "Success"
else:
return "Fail"
elif ftp_type == "登录":
data = 'curl -m 10 ftp://202.38.97.230/pub/iso/linux/knoppix/KNOPPIX_V7.7.1DVD-2016-10-22-EN/dpkg-l-dvd-771.txt -u"anonymous:chrome@example.com" | iconv -f utf-8 -t gbk'
d = self.CMD(data)
# print(d)
if "Graphical (Xorg) program starter for ADRIANE" in d:
return "Success"
else:
return "Fail"
# FTP 下载
def FTP_down(self, ftp_url,file_size,fiel_name):
windows_path = os.getcwd()
linux_path = os.getcwd().replace('\\', '/')
# 判断FTP执行类型下载/登录)
# 调用cmd执行FTP下载文件
data = 'curl -m 20 '+ftp_url+ '-o '+ fiel_name + " ' "
print(data)
d = self.CMD(data)
sleep(5)
fsize = os.path.getsize(linux_path + "/"+fiel_name) # 435814
print(fsize)
if fsize == file_size:
return "Success"
else:
return "Fail"
# FTP 登录
def FTP_login(self, ftp_url,file_content):
data = 'curl -m 10 '+ftp_url+' | iconv -f utf-8 -t gbk'
d = self.CMD(data)
# print(d)
if file_content in d:
return "Success"
else:
return "Fail"
if __name__ == '__main__':
datas = {"url":['https://www.baidu.com'],
"Certificate":"Tango Secure Gateway CA",
# "Content_Type":"text/html",
'log':'Security Event Logs',
"sni":['baidu'],
"intercept_code":"200",
"log_code":"200",
"certifucate":"1",
"log_content":"true"
}
# data= {"url":['https://www.baidu.com'],
# "Certificate":"Tango Secure Gateway CA"
# }
# url = ['https://www.baidu.com']
# url = ['https://www.baidu.com']
# url = ['https://www.baidu.com']
# # Certificate1 = "Tango Secure Gateway CA"
# Certificate = "baidu"
# a='Tango Secure Gateway CA'
# s = Order()
# b = s.manu(url,Certificate)
# print(b[1])
# FTP下载 传入ftp的路径和文件大小
ftp_url = 'ftp://202.38.97.230/pub/iso/linux/knoppix/KNOPPIX_V7.7.1DVD-2016-10-22-EN/dpkg-l-dvd-771.txt -u"anonymous:chrome@example.com" '
ftp_size = 435814
ftp_issue = s.FTP_down(ftp_url,ftp_size)
# FTP登录 传入ftp的路径和文件内容
ftp_url ='ftp://202.38.97.230/pub/iso/linux/knoppix/KNOPPIX_V7.7.1DVD-2016-10-22-EN/dpkg-l-dvd-771.txt -u"anonymous:chrome@example.com" '
file_content = "Graphical (Xorg) program starter for ADRIANE"
ftp_issue = s.FTP_login(ftp_url,file_content)
# for i in b:
# print(i)
# dd = s.CMD('curl -I https://www.baidu.com')
# print(dd)
# if "private, no-cache, no-store, proxy-revalidate, no-transform"in dd:
# print("ok")
# a ='curl -kv -1 --trace certificate.yaml https://www.baidu.com | iconv -f utf-8 -t gbk'

View File

@@ -0,0 +1,11 @@
#-*- coding:utf-8 -*-
'''
created by hch 2019-06-26
'''
class printlog():
def printA():
print("hello word")

View File

@@ -0,0 +1,7 @@
#coding=utf-8
from filetool import filetool
version = '1.0'
class FileLibrary(filetool):
ROBOT_LIBRARY_SCOPE = 'GLOBAL'

View File

@@ -0,0 +1,27 @@
#coding=utf-8
import sys
reload(sys)
sys.setdefaultencoding("utf-8")
""" 变量文件操作 """
class filetool():
def __init__(self):
pass
def alter_dict(self, path, k, v):
data = ''
flag = True
key = '${%s}' % (k)
add = key + '\t%s' % (v) + '\n'
with open(path, 'r+') as f:
for line in f.readlines():
if(line.find(key + '\t') == 0):
line = add
flag = False
data += line
if(flag):
data += add
print data
with open(path, 'w+') as f:
f.writelines(data)

View File

@@ -0,0 +1,200 @@
import poplib
import base64
import time
from email.parser import Parser
# 用来解析邮件主题
from email.header import decode_header
# 用来解析邮件来源
from email.utils import parseaddr
from robot.api.deco import keyword
from robot.api import logger
class AcceptEmail(object):
def __init__(self, user_email, password, pop3_server='serverDemon'):
self.user_email = user_email
self.password = password
self.pop3_server = pop3_server
self.connect_email_server()
def connect_email_server(self):
self.server = poplib.POP3(self.pop3_server)
# 打印POP3服务器的欢迎文字验证是否正确连接到了邮件服务器
# print('连接成功 -- ', self.server.getwelcome().decode('utf8'))
# +OK QQMail POP3 Server v1.0 Service Ready(QQMail v2.0)
# 开始进行身份验证
self.server.user(self.user_email)
self.server.pass_(self.password)
def __del__(self):
# 关闭与服务器的连接,释放资源
self.server.close()
def get_email_count(self):
# 返回邮件总数目和占用服务器的空间大小(字节数), 通过stat()方法即可
email_num, email_size = self.server.stat()
# print("消息的数量: {0}, 消息的总大小: {1}".format(email_num, email_size))
return email_num
def receive_email_info(self, now_count=None):
# 返回邮件总数目和占用服务器的空间大小(字节数), 通过stat()方法即可
email_num, email_size = self.server.stat()
# print("消息的数量: {0}, 消息的总大小: {1}".format(email_num, email_size))
self.email_count = email_num
self.email_sumsize = email_size
# 使用list()返回所有邮件的编号,默认为字节类型的串
rsp, msg_list, rsp_siz = self.server.list()
# print(msg_list, '邮件数量',len(msg_list))
# print("服务器的响应: {0},\n消息列表 {1},\n返回消息的大小 {2}".format(rsp, msg_list, rsp_siz))
# print('邮件总数: {}'.format(len(msg_list)))
self.response_status = rsp
self.response_size = rsp_siz
# 下面获取最新的一封邮件,某个邮件下标(1开始算)
# total_mail_numbers = len(msg_list)
# 动态取消息
total_mail_numbers = now_count
rsp, msglines, msgsiz = self.server.retr(total_mail_numbers)
# rsp, msglines, msgsiz = self.server.retr(1)
# print("服务器的响应: {0},\n原始邮件内容 {1},\n该封邮件所占字节大小 {2}".format(rsp, msglines, msgsiz))
# 从邮件原内容中解析
msg_content = b'\r\n'.join(msglines).decode('utf-8')#gbk
msg = Parser().parsestr(text=msg_content)
self.msg = msg
# print('解码后的邮件信息:\n{}'.format(msg))
def recv(self, now_count=None):
self.receive_email_info(now_count)
self.parser()
def get_email_title(self):
subject = self.msg['Subject']
value, charset = decode_header(subject)[0]
if charset:
value = value.decode(charset)
# print('邮件主题: {0}'.format(value))
self.email_title = value
def get_sender_info(self):
hdr, addr = parseaddr(self.msg['From'])
# name 发送人邮箱名称, addr 发送人邮箱地址
name, charset = decode_header(hdr)[0]
if charset:
name = name.decode(charset)
self.sender_qq_name = name
self.sender_qq_email = addr
# print('发送人邮箱名称: {0},发送人邮箱地址: {1}'.format(name, addr))
def get_email_content(self):
content = self.msg.get_payload()
# 文本信息
content_charset = content[0].get_content_charset() # 获取编码格式
text = content[0].as_string().split('base64')[-1]
text_content = base64.b64decode(text).decode(content_charset) # base64解码
self.email_content = text_content
# print('邮件内容: {0}'.format(text_content))
# 添加了HTML代码的信息
content_charset = content[1].get_content_charset()
text = content[1].as_string().split('base64')[-1]
# html_content = base64.b64decode(text).decode(content_charset)
# print('文本信息: {0}\n添加了HTML代码的信息: {1}'.format(text_content, html_content))
def parser(self):
self.get_email_title()
self.get_sender_info()
#self.get_email_content()
def get_new_mail(user_name, pwd, pop3_server, second=5):
t = AcceptEmail(user_name, pwd, pop3_server)
now_count = t.get_email_count()
#print('开启的时候的邮件数量为:%s' % now_count)
logger.info("开启的时候的邮件数量为:"+str(now_count))
# 每次需要重新连接邮箱服务器,才能获取到最新的消息
# 默认每隔5秒看一次是否有新内容
num = 0
while True:
obj = AcceptEmail(user_name, pwd, pop3_server)
count = obj.get_email_count()
if count > now_count:
new_mail_count = count - now_count
#print('有新的邮件数量:%s' % new_mail_count)
logger.info("有新的邮件数量:"+str(new_mail_count))
now_count += 1
obj.recv(now_count)
yield {"title": obj.email_title, "sender": obj.sender_qq_name, "sender_email": obj.sender_qq_email}
#yield {"title": obj.email_title, "sender": obj.sender_qq_name, "sender_email": obj.sender_qq_email,
# "email_content": obj.email_content}
if new_mail_count > 0:
return
# print('-' * 30)
# print("邮件主题:%s\n发件人:%s\n发件人邮箱:%s\n邮件内容:%s" % (
# obj.email_title, obj.sender_qq_name, obj.sender_qq_email, obj.email_content))
# print('-' * 30)
#else:
#print('没有任何新消息.')
#logger.info("没有任何新消息.")
time.sleep(second)
num += 1
if num == 36:#等待时间粒度一个粒度是5s如果num=10意思就是等待接收邮件50s若没有邮件就返回;36是等待3min
return
@keyword('Recv Email')
def recv_email(user_name, pwd, pop3_server, send_user, subj):
'''
参数说明:
[user_name]:用户名
[pwd]:密码,第三方登入密码
[pop server]pop服务器
[sender]:发送者邮箱,注意全称
[subj_sub]:主题的部分内容,这里是测主题是否包含该参数
[return]返回值成功返回success失败返回n其他
其他问题请阅读该库的readme.txt
'''
dic = {}
logger.info("正在监听邮件服务器端是否有新消息---")
#print('正在监听邮件服务器端是否有新消息---')
try:
iterator = get_new_mail(user_name, pwd, pop3_server)
except TypeError:
#print('监听的内容有误,有图片数据等,无法解析而报错,不是纯文本内容')
logger.info("监听的内容有误,有图片数据等,无法解析而报错,不是纯文本内容")
return "fail"
else:
for dic in iterator:
#print("邮件主题:%s\n发件人:%s\n发件人邮箱:%s\n邮件内容:%s" % (
# dic["title"], dic["sender"], dic["sender_email"], dic["email_content"]))
#logger.info("邮件主题: " + str(dic["title"]) + " 发件人: " + str(dic["sender"])+"发件人邮箱:"+str(dic["sender_email"]))
if dic["sender"] == send_user:
#logger.info("发送者一样")
if subj in dic["title"]:
#logger.info("主题也包含")
return "success"
else:
#logger.info("主题不包含")
return "fail"
else:
return "fail"
#if __name__ == '__main__':
# user = 'xxxx@qq.com'
# pwd = 'xxxx'
# pop3_server = 'pop.qq.com'
# send_user = 'zlking_2019@163.com'
# subj = 'or2020'
# result = recv_email(user, pwd, pop3_server, send_user, subj)
# print(result)

View File

@@ -0,0 +1,26 @@
导入方法:
1.将该目录放到....\Python\Lib\site-packages 下
2.在测试夹具里面要根据绝对路径导入此包
注意:
1.使用该包的关键字时不要用qq邮箱qq邮箱测试发现时长会失灵现象最好用163邮箱等
2.注意关闭邮箱的加密传送方式SSL协议
3.注意邮箱是否支持POP3的协议以及不同邮箱pop服务器的写法
关键字:
[return] Recv Email [user_name] [pwd] [pop server] [sender] [subj_sub]
参数说明:
[user_name]:用户名
[pwd]:密码,第三方登入密码
[pop server]pop服务器
[sender]:发送者邮箱,注意全称
[subj_sub]:主题的部分内容,这里是测主题是否包含该参数
[return]返回值成功返回success失败返回n其他
该关键字默认等待3min3min内每5s检测邮箱是否收到邮件
若收到邮件与[sender]参数进行完全匹配,与[subj_sub]部分主题内容参数进行是否包含匹配匹配成功则返回success
若收到邮件匹配失败或者超时则返回其他,
若修改等待时间可查找源码中的num变量将36改为其他值即可注意时间粒度是5s若num=3则您修改的等待时间是15s

View File

@@ -0,0 +1,417 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# This file is part of robotframework-Smtp3Library.
# https://github.io/lucamaro/robotframework-Smtp3Library
# Licensed under the Apache License 2.0 license:
# http://www.opensource.org/licenses/Apache-2.0
# Copyright (c) 2016, Luca Maragnani <luca.maragnani@gmail.com>
"""
Library implementation
"""
import ast
import smtplib
import email
import random
import string
import mimetypes
import quopri
from email.message import Message
from email.mime.audio import MIMEAudio
from email.mime.base import MIMEBase
from email.mime.image import MIMEImage
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email import encoders
import os.path
import socket
from robot.api.deco import keyword
from robot.api import logger
from Smtp3Library.version import __version__ # NOQA
COMMASPACE = ', '
class Smtp3Library(object):
"""
SMTP Client class
"""
def __init__(self):
"""
Constructor
"""
self.message = self._MailMessage()
self.host = None
self.port = None
self.user = None
self.password = None
self.smtp = None
def _prepare_connection(self, host, port, user=None, password=None):
"""
Private method to collect connection informations
"""
self.host = host
self.port = int(port)
self.user = user
self.password = password
self.client_hostname = socket.gethostname()
def prepare_ssl_connection(self, host, port=465, user=None, password=None):
"""
Collect connection informations for SSL channel
"""
self._prepare_connection(host, port, user, password)
self.smtp = smtplib.SMTP_SSL()
def prepare_connection(self, host, port=25, user=None, password=None):
"""
Collect connection informations for unencrypted channel
"""
self._prepare_connection(host, port, user, password)
self.smtp = smtplib.SMTP()
def add_to_recipient(self, recipient):
"""
Add a recipient to "To:" list
"""
self.message.mail_to.append(recipient)
def add_cc_recipient(self, recipient):
"""
Add a recipient to "Cc:" list
"""
self.message.mail_cc.append(recipient)
def add_bcc_recipient(self, recipient):
"""
Add a recipient to "Bcc:" list
"""
self.message.mail_bcc.append(recipient)
def set_subject(self, subj):
"""
Set email subject
"""
self.message.subject = subj
def set_from(self, from_recipient):
"""
Set from address of message and envelope
"""
self.message.mail_from = from_recipient
def set_body(self, body):
"""
Set email body
"""
self.message.body = body
def set_random_body(self, size):
"""
Set a random body of <size> length
"""
body = ''
for i in range(0, size):
body += ''.join(random.choice(string.uppercase + string.digits))
if i % 80 == 0:
body += "\n"
self.message.body = body
def add_attachment(self, attach):
"""
Add attachment to a list of filenames
"""
self.message.attachments.append(attach)
def add_header(self, name, value):
"""
Add a custom header to headers list
"""
self.message.headers[name] = value
def connect(self):
'''
Open connection to server
Returns tuple (smtp status code, message)
'''
return self.smtp.connect(self.host, self.port)
def present_client_as(self, client_hostname):
'''
Set helo/ehlo client identity
'''
self.client_hostname = client_hostname
def helo(self):
'''
Send HELO command
Returns tuple (smtp status code, message)
'''
result = self.smtp.helo(self.client_hostname)
logger.info(result)
return result
def ehlo(self):
'''
Send EHLO command
Returns tuple (smtp status code, message)
'''
result = self.smtp.ehlo(self.client_hostname)
logger.info(result)
return result
def get_esmtp_features(self):
'''
Returns hashmap with ESMTP feature received with EHLO
'''
logger.info(self.smtp.esmtp_features)
return self.smtp.esmtp_features
def logins(self):
try:
'''
Login user
Returns tuple (smtp status code, message)
'''
logger.info("Login with user " + self.user + " and password " + self.password)
'''try:
subuser=bytes.decode(self.user)
subpassword=bytes.decode(self.password)
result = self.smtp.login(subuser.encode('ascii'), subpassword.encode('ascii'))
logger.info(result)
return result
except:
logger.info("本身就是str类型不需要bytes to str")
subuser=str(self.user).encode('ascii')
subpassword=str(self.password).encode('ascii')
result = self.smtp.login(subuser, subpassword)
logger.info(result)
return result'''
result = self.smtp.login(self.user, self.password)
logger.info(result)
return "success"
except:
return "fail"
def starttls(self, keyfile=None, certfile=None):
'''
sends STARTTLS
optional: keyfile certfile
Returns tuple (smtp status code, message)
'''
logger.info("STARTTLS")
if keyfile is None and certfile is None:
result = self.smtp.starttls()
else:
result = self.smtp.starttls(keyfile, certfile)
logger.info(result)
return result
def data(self):
'''
Data command send email body with "MAIL FROM:", "RCPT TO:" and "DATA" commands
Returns tuple (smtp status code, message)
'''
result = self.smtp.mail(self.message.mail_from)
result += self.smtp.rcpt(self.message.get_message_recipients())
result += self.smtp.data(self.message.get_message_as_string())
logger.info(result)
return result
def sendmail(self):
'''
Send email with "MAIL FROM:", "RCPT TO:" and "DATA" commands
Returns tuple (smtp status code, message)
'''
result = self.smtp.sendmail(self.message.mail_from, self.message.get_message_recipients(), self.message.get_message_as_string())
logger.info(result)
return result
def quit(self):
'''
Send QUIT command
Returns tuple (smtp status code, message)
'''
result = self.smtp.quit()
logger.info(result)
return result
def close_connection(self):
'''
Close connection to server
'''
return self.smtp.close()
def send_message(self):
"""
Send the message, from connection establishment to quit and close connection.
All the connection and email parameters must be already set before invocation.
Returns sendmail response (code, message)
"""
# Send the message
try:
self.connect()
if self.user is not None:
self.ehlo()
self.logins()
send_result = self.sendmail()
self.quit()
self.close_connection()
# return send_result
return "success"
except:
return "fail"
@keyword('Send Message With All Parameters')
def send_message_full(self, host, user, password, subj,
from_recipient, to_recipient, cc_recipient=None, bcc_recipient=None,
body=None, attach=None):
"""
Send a message specifing all parameters on the same linecc
cc, bcc and attach parameters may be strings or array of strings
host, user, password, subj, fromadd, toadd - are mandatory parameters
to use the optional paramaters pleas specify the name fo the parameter in the call
user and password even if mandatory could be set to None so no authentication will be made
Example:
sendMail("smtp.mail.com", None, None, "The subject", "me@mail.com", "friend@mai.com", body="Hello World body")
sendMail("smtp.mail.com", "scott", "tiger", "The subject", "me@mail.com", "friend@mai.com", body="Hello World body", attach=attaches
where could be:
attaches = ["c:\\desktop\\file1.zip", "c:\\desktop\\file2.zip"] or
attaches = "c:\\desktop\\file1.zip"
Returns sendmail response (code, message)
"""
self.host = host
self.user = user
self.password = password
self.set_subject(subj)
self.set_from(from_recipient)
self.message.mail_to = to_recipient
if cc_recipient != None:
self.message.mail_cc = cc_recipient
if bcc_recipient != None:
self.message.mail_bcc = bcc_recipient
#Fill the message
if body != None:
self.set_body(body)
# Part two is attachment
if attach != None:
attachlist = ast.literal_eval(attach)
self.message.attachments = attachlist
#logger.info("self.message.attachments:"+str(type(self.message.attachments)))
#logger.info("attachtype:"+str(type(attachlist)))
#logger.info("attachlist:"+str(attachlist))
return self.send_message()
class _MailMessage:
"""
Simplified email message
This class represent email headers and payload content, not envelope data
"""
def __init__(self):
"""
init object variables
"""
self.mail_from = None
self.mail_to = []
self.mail_cc = []
self.mail_bcc = []
self.subject = ''
self.body = ''
self.attachments = []
self.headers = {}
def get_message_recipients(self):
'''
Get all message recipients (to, cc, bcc)
'''
recipients = []
tolist = ast.literal_eval(self.mail_to)
cclist = ast.literal_eval(self.mail_cc)
bcclist = ast.literal_eval(self.mail_bcc)
recipients.extend(tolist)
recipients.extend(cclist)
recipients.extend(bcclist)
#logger.info("recipientslist:"+str(recipients))
return recipients
def get_message_as_string(self):
'''
Get message as string to be sent with smtplib.sendmail api
'''
if len(self.attachments) > 0:
#logger.info("attachments:"+str(self.attachments))
#logger.info("attachmentstype:"+str(type(self.attachments)))
#logger.info("attachmentsnum:"+str(len(self.attachments)))
envelope = MIMEMultipart()
envelope.attach(MIMEText(self.body))
else:
envelope = MIMEText(self.body)
recipients = self.get_message_recipients()
tolist = ast.literal_eval(self.mail_to)
cclist = ast.literal_eval(self.mail_cc)
envelope['From'] = self.mail_from
envelope['To'] = COMMASPACE.join(tolist)
envelope['Cc'] = COMMASPACE.join(cclist)
envelope['Subject'] = self.subject
#logger.info("envelope111:"+str(self.attachments))
for attachment in list(self.attachments):
ctype, encoding = mimetypes.guess_type(attachment)
#logger.info("attachment:"+attachment+" ctype:"+str(ctype)+" encoding:"+str(encoding))
if ctype is None or encoding is not None:
# No guess could be made, or the file is encoded (compressed), so
# use a generic bag-of-bits type.
ctype = 'application/octet-stream'
maintype, subtype = ctype.split('/', 1)
#logger.info("maintype:"+str(maintype)+" subtype:"+str(subtype))
msg = None
if maintype == 'text':
attach_file = open(attachment,'rb')
# TODO: we should handle calculating the charset
msg = MIMEText(attach_file.read(), _subtype=subtype, _charset='utf-8')
attach_file.close()
elif maintype == 'image':
attach_file = open(attachment, 'rb')
msg = MIMEImage(attach_file.read(), _subtype=subtype)
attach_file.close()
elif maintype == 'audio':
attach_file = open(attachment, 'rb')
msg = MIMEAudio(attach_file.read(), _subtype=subtype)
attach_file.close()
else:
attach_file = open(attachment, 'rb')
msg = MIMEBase(maintype, subtype)
msg.set_payload(attach_file.read())
attach_file.close()
# Encode the payload using Base64
encoders.encode_base64(msg)
# Set the filename parameter
msg.add_header('Content-Disposition', 'attachment',
filename=os.path.basename(attachment))
envelope.attach(msg)
#logger.info("envelope.as_string:"+envelope.as_string())
return envelope.as_string()

View File

@@ -0,0 +1,11 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# This file is part of robotframework-Smtp3Library.
# https://github.io/lucamaro/robotframework-SmtpLibrary
# Licensed under the Apache License 2.0 license:
# http://www.opensource.org/licenses/Apache-2.0
# Copyright (c) 2016, Luca Maragnani <luca.maragnani@gmail.com>
__version__ = '0.1.3' # NOQA