#线程安全/竞争条件,锁/死锁检测,线程池,生产消费模型,伪并发,微线程,协程
#Stackless Python 是Python编程语言的一个增强版本,它使程序员从基于线程的编程方式中获得好处,并避免传统线程所带来的性能与复杂度问题。Stackless为 Python带来的微线程扩展,是一种低开销、轻量级的便利工具
Queue队列
import Queue
q = = Queue.Queue(3)
q.put('a', True, 5) # True等待超时时间, False不等待
if q.full(): # 队列满了返回True,反之False
q.qsize() # 队列长度
workQueue.queue.clear() # 清空队列
q.get(True,5) # True等待超时时间, False不等待
thread/threading多线程
thread
start_new_thread(function,args kwargs=None) # 产生一个新的线程
allocate_lock() # 分配一个LockType类型的锁对象
exit() # 让线程退出
acquire(wait=None) # 尝试获取锁对象
locked() # 如果获取了锁对象返回True
release() # 释放锁
thread例子1
#!/usr/bin/env python
#thread_test.py
#不支持守护进程
import thread
from time import sleep,ctime
loops = [4,2]
def loop(nloop,nsec,lock):
print('start loop %s at:%s' % (nloop,ctime()))
sleep(nsec)
print('loop %s done at: %s' % (nloop, ctime()))
lock.release() # 分配已获得的锁,操作结束后释放相应的锁通知主线程
def main():
print('starting at:',ctime())
locks = []
nloops = range(len(loops))
for i in nloops:
lock = thread.allocate_lock() # 创建一个锁
lock.acquire() # 调用各个锁的acquire()函数获得锁
locks.append(lock) # 把锁放到锁列表locks中
for i in nloops:
thread.start_new_thread(loop,(i,loops[i],locks[i])) # 创建线程
for i in nloops:
while locks[i].locked():pass # 等待全部解锁才继续运行
print('all DONE at:',ctime())
if __name__ == '__main__':
main()
thread例子2
#coding=utf-8
import thread,time,os
def f(name):
i =3
while i:
time.sleep(1)
print(name
i -= 1
# os._exit() 会把整个进程关闭
os._exit(22)
if __name__ == '__main__':
thread.start_new_thread(f,("th1",))
while 1:
pass
os._exit(0)
threading
Thread # 表示一个线程的执行的对象
start() # 开始线程的执行
run() # 定义线程的功能的函数(一般会被子类重写)
join(timeout=None) # 允许主线程等待线程结束,程序挂起,直到线程结束;如果给了timeout,则最多等待timeout秒.
getName() # 返回线程的名字
setName(name) # 设置线程的名字
isAlive() # 布尔标志,表示这个线程是否还在运行中
isDaemon() # 返回线程的daemon标志
setDaemon(daemonic) # 后台线程,把线程的daemon标志设置为daemonic(一定要在调用start()函数前调用)
# 默认主线程在退出时会等待所有子线程的结束。如果希望主线程不等待子线程,而是在退出时自动结束所有的子线程,就需要设置子线程为后台线程(daemon)
Lock # 锁原语对象
Rlock # 可重入锁对象.使单线程可以在此获得已获得了的锁(递归锁定)
Condition # 条件变量对象能让一个线程停下来,等待其他线程满足了某个条件.如状态改变或值的改变
Event # 通用的条件变量.多个线程可以等待某个事件的发生,在事件发生后,所有的线程都会被激活
Semaphore # 为等待锁的线程提供一个类似等候室的结构
BoundedSemaphore # 与Semaphore类似,只是不允许超过初始值
Time # 与Thread相似,只是他要等待一段时间后才开始运行
activeCount() # 当前活动的线程对象的数量
currentThread() # 返回当前线程对象
enumerate() # 返回当前活动线程的列表
settrace(func) # 为所有线程设置一个跟踪函数
setprofile(func) # 为所有线程设置一个profile函数
threading例子1
#!/usr/bin/env python
#encoding:utf8
import threading
from Queue import Queue
from time import sleep,ctime
class ThreadFunc(object):
def __init__(self,func,args,name=''):
self.name=name
self.func=func # loop
self.args=args # (i,iplist[i],queue)
def __call__(self):
apply(self.func,self.args) # 函数apply() 执行loop函数并传递元组参数
def loop(nloop,ip,queue):
print('start',nloop,'at:',ctime())
queue.put(ip)
sleep(2)
print('loop',nloop,'done at:',ctime())
if __name__ == '__main__':
threads = []
queue = Queue()
iplist = ['192.168.1.2','192.168.1.3','192.168.1.4','192.168.1.5','192.168.1.6','192.168.1.7','192.168.1.8']
nloops = range(len(iplist))
for i in nloops:
t = threading.Thread(target=ThreadFunc(loop,(i,iplist[i],queue),loop.__name__))
threads.append(t)
for i in nloops:
threads[i].start()
for i in nloops:
threads[i].join()
for i in nloops:
print(queue.get())
threading例子2
#!/usr/bin/env python
#encoding:utf8
from Queue import Queue
import random,time,threading
class Producer(threading.Thread):
def __init__(self, t_name, queue):
threading.Thread.__init__(self, name=t_name)
self.data=queue
def run(self):
for i in range(5):
print("%s: %s is producing %d to the queue!\n" %(time.ctime(), self.getName(), i)
self.data.put(i)
self.data.put(i*i)
time.sleep(2)
print("%s: %s finished!" %(time.ctime(), self.getName()))
class Consumer(threading.Thread):
def __init__(self, t_name, queue):
threading.Thread.__init__(self, name=t_name)
self.data=queue
def run(self):
for i in range(10):
val = self.data.get()
print("%s: %s is consuming. %d in the queue is consumed!\n" %(time.ctime(), self.getName(), val))
print("%s: %s finished!" %(time.ctime(), self.getName()))
if __name__ == '__main__':
queue = Queue()
producer = Producer('Pro.', queue)
consumer = Consumer('Con.', queue)
producer.start()
consumer.start()
producer.join()
consumer.join()
threading例子3
# 启动线程后自动执行 run函数其他不可以
import threading
import time
class Th(threading.Thread):
def __init__(self,name):
threading.Thread.__init__(self)
self.t_name=name
self.daemon = True # 默认为false,让主线程等待处理完成
def run(self):
time.sleep(1)
print("this is " + self.t_name)
if __name__ == '__main__':
thread1 = Th("Th_1")
thread1.start()
threading例子4
import threading
import time
class Th(threading.Thread):
def __init__(self,thread_name):
threading.Thread.__init__(self)
self.setName(thread_name)
def run(self):
threadLock.acquire()
print(self.getName())
for i in range(3):
time.sleep(1)
print(str(i))
print(self.getName() + " is over")
threadLock.release()
if __name__ == '__main__':
threadLock = threading.Lock()
thread1 = Th("Th_1")
thread2 = Th("Th_2")
thread1.start()
thread2.start()
后台线程
import threading
import time,random
class MyThread(threading.Thread):
def run(self):
wait_time=random.randrange(1,10)
print(%s will wait %d seconds" % (self.name, wait_time))
time.sleep(wait_time)
print("%s finished!" % self.name)
if __name__=="__main__":
for i in range(5):
t = MyThread()
t.setDaemon(True) # 设置为后台线程,主线程完成时不等待子线程完成就结束
t.start()
threading控制最大并发_查询日志中IP信息
#!/usr/bin/env python
#coding:utf-8
import urllib2
import json
import threading
import time
'''
多线程并发控制. 如果要改成多进程,只需把threading 换成 mulitprocessing.Process , 对, 就是换个名字而已.
'''
#获取ip 及其出现次数
def ip_dic(file_obj, dic):
for i in file_obj:
if i:
ip=i.split('-')[0].strip()
if ip in dic.keys():
dic[ip]=dic[ip] + 1
else:
dic[ip]=1
return dic.iteritems()
#目标函数
def get_data(url, ipcounts):
data=urllib2.urlopen(url).read()
datadict=json.loads(data)
fdata = u"ip:%s---%s,%s,%s,%s,%s" %(datadict["data"]["ip"],ipcounts,datadict["data"]["country"],datadict["data"]["region"],datadict["data"]["city"],datadict["data"]["isp"])
print(fdata)
#多线程
def threads(iters):
thread_pool = []
for k in iters:
url = "http://ip.taobao.com/service/getIpInfo.php?ip="
ipcounts = k[1]
url = (url + k[0]).strip()
t = threading.Thread(target=get_data, args=(url, ipcounts))
thread_pool.append(t)
return thread_pool
#控制多线程
def startt(t_list, max,second):
l = len(t_list)
n = max
while l > 0:
if l > max:
nl = t_list[:max]
t_list = t_list[max:]
for t in nl:
t.start()
time.sleep(second)
for t in nl:
t.join()
print('*'*15, str(n)+ ' ip has been queried'+'*'*15)
n += max
l = len(t_list)
continue
elif l <= max:
nl = t_list
for t in nl:
t.start()
for t in nl:
t.join()
print('>>> Totally ' + str(n+l ) + ' ip has been queried')
l = 0
if __name__ =="__main__":
dic={}
with open('access.log') as file_obj:
it = ip_dic(file_obj, dic)
t_list= threads(it)
startt(t_list, 15, 1)
多线程取队列
#!/usr/bin/python
import Queue
import threading
import time
exitFlag = 0
class myThread(threading.Thread):
def __init__(self, threadID, name, q):
threading.Thread.__init__(self)
self.threadID = threadID
self.name = name
self.q = q
def run(self):
print("Starting " + self.name)
process_data(self.name, self.q)
print("Exiting " + self.name)
def process_data(threadName, q):
while not exitFlag: # 死循环等待
queueLock.acquire()
if not q.empty(): # 判断队列是否为空
data = q.get()
print("%s processing %s" % (threadName, data))
queueLock.release()
time.sleep(1)
nameList = ["One", "Two", "Three", "Four", "Five"]
queueLock = threading.Lock() # 锁与队列并无任何关联,其他线程也进行取锁操作的时候就会检查是否有被占用,有就阻塞等待解锁为止
workQueue = Queue.Queue(10)
threads = []
threadID = 1
# Create new threads
for threadID in range(100):
thread = myThread(threadID, 'tName%s' % threadID, workQueue)
thread.start()
threads.append(thread)
threadID += 1
# Fill the queue
queueLock.acquire()
for word in nameList:
workQueue.put(word)
queueLock.release()
# Wait for queue to empty
while not workQueue.empty(): # 死循环判断队列被处理完毕
pass
# Notify threads it's time to exit
exitFlag = 1
# Wait for all threads to complete
for t in threads:
t.join()
print("Exiting Main Thread")
Queue通用队列
q=Queue(size) # 创建大小size的Queue对象
qsize() # 返回队列的大小(返回时候,可能被其他进程修改,近似值)
empty() # 如果队列为空返回True,否则False
full() # 如果队列已满返回True,否则False
put(item,block0) # 把item放到队列中,如果给了block(不为0),函数会一直阻塞到队列中有空间为止
get(block=0) # 从队列中取一个对象,如果给了block(不为0),函数会一直阻塞到队列中有对象为止
get_nowait # 默认get阻塞,这个不阻塞
multiprocessing [多进程并发]
线程池
import urllib2
from multiprocessing.dummy import Pool as ThreadPool
urls=['http://www.baidu.com','http://www.sohu.com']
pool=ThreadPool(4) # 线程池
results=pool.map(urllib2.urlopen,urls)
pool.close()
pool.join()
进程并发
#!/usr/bin/env python
#encoding:utf8
from multiprocessing import Process
import time,os
def f(name):
time.sleep(1)
print('hello ',name)
print(os.getppid()) # 取得父进程ID
print(os.getpid()) # 取得进程ID
process_list = []
for i in range(10):
p = Process(target=f,args=(i,))
p.start()
process_list.append(p)
for j in process_list:
j.join()
进程池
#!/usr/bin/env python
#encoding:utf8
from multiprocessing import Pool
import time,os
def f(name):
time.sleep(1)
print('hello ',name)
print(os.getppid())
print(os.getpid())
process_list = []
pool = Pool(4)
res = pool.map(f, range(1,10))
pool.close()
pool.join()
Queue进程间通信
from multiprocessing import Process,Queue
import time
def f(name):
time.sleep(1)
q.put(['hello'+str(name)])
process_list = []
q = Queue()
if __name__ == '__main__':
for i in range(10):
p = Process(target=f,args=(i,))
p.start()
process_list.append(p)
for j in process_list:
j.join()
for i in range(10):
print(q.get())
Pipe管道 # 单项通信
from multiprocessing import Process,Pipe
import time
import os
def f(conn,name):
time.sleep(1)
conn.send(['hello'+str(name)])
print(os.getppid(),'-----------',os.getpid())
process_list = []
parent_conn,child_conn = Pipe()
if __name__ == '__main__':
for i in range(10):
p = Process(target=f,args=(child_conn,i))
p.start()
process_list.append(p)
for j in process_list:
j.join()
for p in range(10):
print(parent_conn.recv())
进程间同步
#加锁,使某一时刻只有一个进程,其他在调用同一个锁就会被阻塞
from multiprocessing import Process,Lock
import time
import os
def f(name):
lock.acquire()
time.sleep(1)
print('hello--'+str(name))
print(os.getppid(),'-----------',os.getpid())
lock.release()
process_list = []
lock = Lock()
if __name__ == '__main__':
for i in range(10):
p = Process(target=f,args=(i,))
p.start()
process_list.append(p)
for j in process_list:
j.join()
共享内存 # 双向通信
from multiprocessing import Process,Value,Array
import time
import os
def f(n,a,name):
time.sleep(1)
n.value = name * name
for i in range(len(a)):
a[i] = -i
process_list = []
if __name__ == '__main__':
num = Value('d',0.0)
arr = Array('i',range(10))
for i in range(10):
p = Process(target=f,args=(num,arr,i))
p.start()
process_list.append(p)
for j in process_list:
j.join()
print(num.value)
print(arr[:])
# 通过使用Value或者Array把数据存储在一个共享的内存表中
# 'd'和'i'参数是num和arr用来设置类型,d表示一个双精浮点类型,i表示一个带符号的整型。
manager
# 比共享内存灵活,但缓慢
# 支持list,dict,Namespace,Lock,Semaphore,BoundedSemaphore,Condition,Event,Queue,Value,Array
from multiprocessing import Process,Manager
import time
import os
def f(d,name):
time.sleep(1)
d[name] = name * name
print(d)
process_list = []
if __name__ == '__main__':
manager = Manager()
d = manager.dict()
for i in range(10):
p = Process(target=f,args=(d,i))
p.start()
process_list.append(p)
for j in process_list:
j.join()
print(d)
最大并发数
import multiprocessing
import time,os
result = []
def run(h):
print('threading:' ,h,os.getpid()
p = multiprocessing.Pool(processes=20)
for i in range(100):
result.append(p.apply_async(run,(i,)))
p.close()
for res in result:
res.get(timeout=5)
gevent [轻量级协程]
# 在gevent中用到的主要模式是Greenlet, 它是以C扩展模块形式接入Python的轻量级协程。 Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度。
# http://xlambda.com/gevent-tutorial/
锁的使用
# 同时允许多个协程操作对象的锁,通过互斥访问,保证资源只在程序上下文被单次使用
from gevent import sleep
from gevent.pool import Pool
from gevent.coros import BoundedSemaphore
sem = BoundedSemaphore(2) # 超过2就会阻塞等待
def worker1(n):
sem.acquire()
print('Worker %i acquired semaphore' % n)
sleep(0)
sem.release()
print('Worker %i released semaphore' % n)
def worker2(n):
with sem:
print('Worker %i acquired semaphore' % n)
sleep(0)
print('Worker %i released semaphore' % n)
pool = Pool()
pool.map(worker1, xrange(0,2))
pool.map(worker2, xrange(3,6))
事件
Event 阻塞事件
import gevent
from gevent.event import Event
evt = Event()
def setter():
'''After 3 seconds, wake all threads waiting on the value of evt'''
print('A: Hey wait for me, I have to do something')
gevent.sleep(3)
print("Ok, I'm done")
evt.set() # 表示事件完成
def waiter():
'''After 3 seconds the get call will unblock'''
print("I'll wait for you")
evt.wait() # 阻塞等待事件完成
print("It's about time")
gevent.joinall([
gevent.spawn(setter),
gevent.spawn(waiter),
gevent.spawn(waiter),
gevent.spawn(waiter),
gevent.spawn(waiter),
gevent.spawn(waiter)
])
AsyncResult 可传值的事件
import gevent
from gevent.event import AsyncResult
a = AsyncResult()
def setter():
gevent.sleep(3)
a.set('Hello!') # 事件传值
def waiter():
"""
After 3 seconds the get call will unblock after the setter
puts a value into the AsyncResult.
"""
print(a.get()) # 获取时间值
gevent.joinall([
gevent.spawn(setter),
gevent.spawn(waiter),
])
队列
#/usr/local/python
#encoding:utf8
import gevent
from gevent.pool import Pool
from gevent.coros import BoundedSemaphore
from gevent.queue import Queue, Empty
import os
tasks = Queue(maxsize=30) # 队列 超过30引发 gevent.hub.LoopExit
tasks1 = Queue()
def boss():
print('放队列任务')
for i in xrange(1,25):
tasks.put(i)
def worker1(n):
print(len(pool))
while not tasks.empty(): # 判断队列是否为空
task = tasks.get() # 获取队列内容
tasks1.put(os.popen('id').read())
print('Worker %s got task %s' % (n, task))
gevent.sleep(0) # 放弃当前任务
def worker2(name):
try:
while True:
task = tasks1.get(timeout=2)
print('获取后释放:%s' % task)
gevent.sleep(0)
except Empty: # 等待超时报错完成
print('Quitting time!')
gevent.spawn(boss).join() # 执行单次协程任务
pool = Pool(5) # 协程池大小
pool.map(worker1, xrange(0,20)) # 通过map方法把多个任务分发给池中的5个协程
gevent.joinall([ # 同时执行多个协程任务
gevent.spawn(worker2, 'steve'),
gevent.spawn(worker2, 'john'),
gevent.spawn(worker2, 'nancy'),
])