Python-并发与多线程

TomTao626 于 2019-04-23 发布
🥰本站访客数 👀本文阅读量

#线程安全/竞争条件,锁/死锁检测,线程池,生产消费模型,伪并发,微线程,协程

#Stackless Python 是Python编程语言的一个增强版本,它使程序员从基于线程的编程方式中获得好处,并避免传统线程所带来的性能与复杂度问题。Stackless为 Python带来的微线程扩展,是一种低开销、轻量级的便利工具

Queue队列

import Queue
q = = Queue.Queue(3)

q.put('a', True, 5)              # True等待超时时间, False不等待
if q.full():                     # 队列满了返回True,反之False
    q.qsize()                    # 队列长度
    workQueue.queue.clear()      # 清空队列
q.get(True,5)                    # True等待超时时间, False不等待

thread/threading多线程

thread

start_new_thread(function,args kwargs=None)    # 产生一个新的线程
allocate_lock()                                # 分配一个LockType类型的锁对象
exit()                                         # 让线程退出
acquire(wait=None)                             # 尝试获取锁对象
locked()                                       # 如果获取了锁对象返回True
release()                                      # 释放锁

thread例子1

#!/usr/bin/env python
#thread_test.py
#不支持守护进程
import thread
from time import sleep,ctime

loops = [4,2]

def loop(nloop,nsec,lock):
    print('start loop %s at:%s' % (nloop,ctime()))
    sleep(nsec)
    print('loop %s done at: %s' % (nloop, ctime()))
    lock.release()              # 分配已获得的锁,操作结束后释放相应的锁通知主线程

def main():
    print('starting at:',ctime())
    locks = []
    nloops = range(len(loops))

    for i in nloops:
        lock = thread.allocate_lock()     # 创建一个锁
        lock.acquire()                    # 调用各个锁的acquire()函数获得锁
        locks.append(lock)                # 把锁放到锁列表locks中
    for i in nloops:
        thread.start_new_thread(loop,(i,loops[i],locks[i]))   # 创建线程
    for i in nloops:
        while locks[i].locked():pass      # 等待全部解锁才继续运行
    print('all DONE at:',ctime())

if __name__ == '__main__':
    main()

thread例子2

#coding=utf-8
import thread,time,os

def f(name):
    i =3
    while i:
        time.sleep(1)
        print(name
        i -= 1
    # os._exit()   会把整个进程关闭
    os._exit(22)

if __name__ == '__main__':
    thread.start_new_thread(f,("th1",))
    while 1:
        pass
    os._exit(0)

threading

Thread                   # 表示一个线程的执行的对象
    start()              # 开始线程的执行
    run()                # 定义线程的功能的函数(一般会被子类重写)
    join(timeout=None)   # 允许主线程等待线程结束,程序挂起,直到线程结束;如果给了timeout,则最多等待timeout秒.
    getName()            # 返回线程的名字
    setName(name)        # 设置线程的名字
    isAlive()            # 布尔标志,表示这个线程是否还在运行中
    isDaemon()           # 返回线程的daemon标志
    setDaemon(daemonic)  # 后台线程,把线程的daemon标志设置为daemonic(一定要在调用start()函数前调用)
    # 默认主线程在退出时会等待所有子线程的结束。如果希望主线程不等待子线程,而是在退出时自动结束所有的子线程,就需要设置子线程为后台线程(daemon)
Lock              # 锁原语对象
Rlock             # 可重入锁对象.使单线程可以在此获得已获得了的锁(递归锁定)
Condition         # 条件变量对象能让一个线程停下来,等待其他线程满足了某个条件.如状态改变或值的改变
Event             # 通用的条件变量.多个线程可以等待某个事件的发生,在事件发生后,所有的线程都会被激活
Semaphore         # 为等待锁的线程提供一个类似等候室的结构
BoundedSemaphore  # 与Semaphore类似,只是不允许超过初始值
Time              # 与Thread相似,只是他要等待一段时间后才开始运行
activeCount()     # 当前活动的线程对象的数量
currentThread()   # 返回当前线程对象
enumerate()       # 返回当前活动线程的列表
settrace(func)    # 为所有线程设置一个跟踪函数
setprofile(func)  # 为所有线程设置一个profile函数

threading例子1

#!/usr/bin/env python
#encoding:utf8
import threading
from Queue import Queue
from time import sleep,ctime

class ThreadFunc(object):
    def __init__(self,func,args,name=''):
        self.name=name
        self.func=func                    # loop
        self.args=args                    # (i,iplist[i],queue)
    def __call__(self):
        apply(self.func,self.args)        # 函数apply() 执行loop函数并传递元组参数
def loop(nloop,ip,queue):
    print('start',nloop,'at:',ctime())
    queue.put(ip)
    sleep(2)
    print('loop',nloop,'done at:',ctime())
if __name__ == '__main__':
    threads = []
    queue = Queue()
    iplist = ['192.168.1.2','192.168.1.3','192.168.1.4','192.168.1.5','192.168.1.6','192.168.1.7','192.168.1.8']
    nloops = range(len(iplist))

    for i in nloops:
        t = threading.Thread(target=ThreadFunc(loop,(i,iplist[i],queue),loop.__name__))
        threads.append(t)
    for i in nloops:
        threads[i].start()
    for i in nloops:
        threads[i].join()
    for i in nloops:
        print(queue.get())

threading例子2

#!/usr/bin/env python
#encoding:utf8
from Queue import Queue
import random,time,threading

class Producer(threading.Thread):
    def __init__(self, t_name, queue):
        threading.Thread.__init__(self, name=t_name)
        self.data=queue
    def run(self):
        for i in range(5):
            print("%s: %s is producing %d to the queue!\n" %(time.ctime(), self.getName(), i)
            self.data.put(i)
            self.data.put(i*i)
            time.sleep(2)
        print("%s: %s finished!" %(time.ctime(), self.getName()))

class Consumer(threading.Thread):
    def __init__(self, t_name, queue):
        threading.Thread.__init__(self, name=t_name)
        self.data=queue
    def run(self):
        for i in range(10):
            val = self.data.get()
            print("%s: %s is consuming. %d in the queue is consumed!\n" %(time.ctime(), self.getName(), val))
        print("%s: %s finished!" %(time.ctime(), self.getName()))

if __name__ == '__main__':
    queue = Queue()
    producer = Producer('Pro.', queue)
    consumer = Consumer('Con.', queue)
    producer.start()
    consumer.start()
    producer.join()
    consumer.join()

threading例子3

# 启动线程后自动执行 run函数其他不可以
import threading
import time

class Th(threading.Thread):
    def __init__(self,name):
        threading.Thread.__init__(self)
        self.t_name=name
        self.daemon = True     # 默认为false,让主线程等待处理完成
    def run(self):
        time.sleep(1)
        print("this is " + self.t_name)

if __name__ == '__main__':
    thread1 = Th("Th_1")
    thread1.start()

threading例子4

import threading
import time
class Th(threading.Thread):
    def __init__(self,thread_name):
        threading.Thread.__init__(self)
        self.setName(thread_name)
    def run(self):
        threadLock.acquire()
        print(self.getName())
        for i in range(3):
            time.sleep(1)
            print(str(i))
        print(self.getName() +  " is over")
        threadLock.release()

if __name__ == '__main__':
    threadLock = threading.Lock()
    thread1 = Th("Th_1")
    thread2 = Th("Th_2")
    thread1.start()
    thread2.start()

后台线程

import threading
import time,random

class MyThread(threading.Thread):
    def run(self):
        wait_time=random.randrange(1,10)
        print(%s will wait %d seconds" % (self.name, wait_time))
        time.sleep(wait_time)
        print("%s finished!" % self.name)

if __name__=="__main__":
    for i in range(5):
        t = MyThread()
        t.setDaemon(True)    # 设置为后台线程,主线程完成时不等待子线程完成就结束
        t.start()

threading控制最大并发_查询日志中IP信息

#!/usr/bin/env python
#coding:utf-8
import urllib2
import json
import threading
import time

'''
多线程并发控制. 如果要改成多进程,只需把threading 换成 mulitprocessing.Process , 对, 就是换个名字而已.
'''

#获取ip 及其出现次数
def ip_dic(file_obj, dic):
    for i in file_obj:
        if i:
            ip=i.split('-')[0].strip()
            if ip in dic.keys():
                dic[ip]=dic[ip] + 1
            else:
                dic[ip]=1
    return dic.iteritems()

#目标函数
def get_data(url, ipcounts):
    data=urllib2.urlopen(url).read()
    datadict=json.loads(data)
    fdata = u"ip:%s---%s,%s,%s,%s,%s" %(datadict["data"]["ip"],ipcounts,datadict["data"]["country"],datadict["data"]["region"],datadict["data"]["city"],datadict["data"]["isp"])
    print(fdata)

#多线程
def threads(iters):
    thread_pool = []
    for k in iters:
        url = "http://ip.taobao.com/service/getIpInfo.php?ip="
        ipcounts = k[1]
        url = (url + k[0]).strip()
        t = threading.Thread(target=get_data, args=(url, ipcounts))
        thread_pool.append(t)
    return thread_pool

#控制多线程
def startt(t_list, max,second):
    l = len(t_list)
    n = max
    while l > 0:
        if l > max:
            nl = t_list[:max]
            t_list = t_list[max:]
            for t in nl:
                t.start()
            time.sleep(second)
            for t in nl:
                t.join()
            print('*'*15,  str(n)+ ' ip has been queried'+'*'*15)
            n += max
            l = len(t_list)
            continue
        elif l <= max:
            nl = t_list
            for t in nl:
                t.start()
            for t in nl:
                t.join()
            print('>>> Totally ' + str(n+l ) + ' ip has been queried')
            l = 0

if __name__ =="__main__":
    dic={}
    with open('access.log') as file_obj:
        it = ip_dic(file_obj, dic)
        t_list= threads(it)
        startt(t_list, 15, 1)

多线程取队列

#!/usr/bin/python

import Queue
import threading
import time

exitFlag = 0

class myThread(threading.Thread):
    def __init__(self, threadID, name, q):
        threading.Thread.__init__(self)
        self.threadID = threadID
        self.name = name
        self.q = q
    def run(self):
        print("Starting " + self.name)
        process_data(self.name, self.q)
        print("Exiting " + self.name)

def process_data(threadName, q):
    while not exitFlag:      # 死循环等待
        queueLock.acquire()
        if not q.empty():    # 判断队列是否为空
            data = q.get()
            print("%s processing %s" % (threadName, data))
        queueLock.release()
        time.sleep(1)

nameList = ["One", "Two", "Three", "Four", "Five"]
queueLock = threading.Lock()     # 锁与队列并无任何关联,其他线程也进行取锁操作的时候就会检查是否有被占用,有就阻塞等待解锁为止
workQueue = Queue.Queue(10)
threads = []
threadID = 1

# Create new threads
for threadID in range(100):
    thread = myThread(threadID, 'tName%s' % threadID, workQueue)
    thread.start()
    threads.append(thread)
    threadID += 1

# Fill the queue
queueLock.acquire()
for word in nameList:
    workQueue.put(word)
queueLock.release()

# Wait for queue to empty
while not workQueue.empty():   # 死循环判断队列被处理完毕
    pass

# Notify threads it's time to exit
exitFlag = 1

# Wait for all threads to complete
for t in threads:
    t.join()
print("Exiting Main Thread")

Queue通用队列

q=Queue(size)       # 创建大小size的Queue对象
qsize()             # 返回队列的大小(返回时候,可能被其他进程修改,近似值)
empty()             # 如果队列为空返回True,否则False
full()              # 如果队列已满返回True,否则False
put(item,block0)    # 把item放到队列中,如果给了block(不为0),函数会一直阻塞到队列中有空间为止
get(block=0)        # 从队列中取一个对象,如果给了block(不为0),函数会一直阻塞到队列中有对象为止
get_nowait          # 默认get阻塞,这个不阻塞

multiprocessing [多进程并发]

线程池

import urllib2
from multiprocessing.dummy import Pool as ThreadPool

urls=['http://www.baidu.com','http://www.sohu.com']

pool=ThreadPool(4)   # 线程池
results=pool.map(urllib2.urlopen,urls)
pool.close()
pool.join()

进程并发

#!/usr/bin/env python
#encoding:utf8
from multiprocessing import Process
import time,os
def f(name):
    time.sleep(1)
    print('hello ',name)
    print(os.getppid())   # 取得父进程ID
    print(os.getpid())    # 取得进程ID
process_list = []

for i in range(10):
    p = Process(target=f,args=(i,))
    p.start()
    process_list.append(p)
for j in process_list:
    j.join()

进程池

#!/usr/bin/env python
#encoding:utf8
from multiprocessing import Pool
import time,os
def f(name):
    time.sleep(1)
    print('hello ',name)
    print(os.getppid())
    print(os.getpid())
process_list = []

pool = Pool(4)
res = pool.map(f, range(1,10))
pool.close()
pool.join()

Queue进程间通信

from multiprocessing import Process,Queue
import time
def f(name):
    time.sleep(1)
    q.put(['hello'+str(name)])
process_list = []
q = Queue()
if __name__ == '__main__':
    for i in range(10):
        p = Process(target=f,args=(i,))
        p.start()
        process_list.append(p)
    for j in process_list:
        j.join()
    for i in range(10):
        print(q.get())

Pipe管道 # 单项通信

from multiprocessing import Process,Pipe
import time
import os

def f(conn,name):
    time.sleep(1)
    conn.send(['hello'+str(name)])
    print(os.getppid(),'-----------',os.getpid())
process_list = []
parent_conn,child_conn = Pipe()
if __name__ == '__main__':
    for i in range(10):
        p = Process(target=f,args=(child_conn,i))
        p.start()
        process_list.append(p)
    for j in process_list:
        j.join()
    for p in range(10):
        print(parent_conn.recv())

进程间同步

#加锁,使某一时刻只有一个进程,其他在调用同一个锁就会被阻塞
from multiprocessing import Process,Lock
import time
import os

def f(name):
    lock.acquire()
    time.sleep(1)
    print('hello--'+str(name))
    print(os.getppid(),'-----------',os.getpid())
    lock.release()
process_list = []
lock = Lock()
if __name__ == '__main__':
    for i in range(10):
        p = Process(target=f,args=(i,))
        p.start()
        process_list.append(p)
    for j in process_list:
        j.join()

共享内存 # 双向通信

from multiprocessing import Process,Value,Array
import time
import os

def f(n,a,name):
    time.sleep(1)
    n.value = name * name
    for i in range(len(a)):
        a[i] = -i
process_list = []
if __name__ == '__main__':
    num = Value('d',0.0)
    arr = Array('i',range(10))
    for i in range(10):
        p = Process(target=f,args=(num,arr,i))
        p.start()
        process_list.append(p)
    for j in process_list:
        j.join()
    print(num.value)
    print(arr[:])
# 通过使用Value或者Array把数据存储在一个共享的内存表中
# 'd'和'i'参数是num和arr用来设置类型,d表示一个双精浮点类型,i表示一个带符号的整型。

manager

# 比共享内存灵活,但缓慢
# 支持list,dict,Namespace,Lock,Semaphore,BoundedSemaphore,Condition,Event,Queue,Value,Array
from multiprocessing import Process,Manager
import time
import os

def f(d,name):
    time.sleep(1)
    d[name] = name * name
    print(d)
process_list = []
if __name__ == '__main__':
    manager = Manager()
    d = manager.dict()
    for i in range(10):
        p = Process(target=f,args=(d,i))
        p.start()
        process_list.append(p)
    for j in process_list:
        j.join()
        print(d)

最大并发数

import multiprocessing
import time,os

result = []
def run(h):
    print('threading:' ,h,os.getpid()
p = multiprocessing.Pool(processes=20)

for i in range(100):
    result.append(p.apply_async(run,(i,)))
p.close()

for res in result:
    res.get(timeout=5)

gevent [轻量级协程]

# 在gevent中用到的主要模式是Greenlet, 它是以C扩展模块形式接入Python的轻量级协程。 Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度。
# http://xlambda.com/gevent-tutorial/

锁的使用

# 同时允许多个协程操作对象的锁,通过互斥访问,保证资源只在程序上下文被单次使用
from gevent import sleep
from gevent.pool import Pool
from gevent.coros import BoundedSemaphore

sem = BoundedSemaphore(2)        # 超过2就会阻塞等待

def worker1(n):
    sem.acquire()
    print('Worker %i acquired semaphore' % n)
    sleep(0)
    sem.release()
    print('Worker %i released semaphore' % n)

def worker2(n):
    with sem:
        print('Worker %i acquired semaphore' % n)
        sleep(0)
    print('Worker %i released semaphore' % n)

pool = Pool()
pool.map(worker1, xrange(0,2))
pool.map(worker2, xrange(3,6))

事件

Event 阻塞事件

import gevent
from gevent.event import Event

evt = Event()

def setter():
    '''After 3 seconds, wake all threads waiting on the value of evt'''
    print('A: Hey wait for me, I have to do something')
    gevent.sleep(3)
    print("Ok, I'm done")
    evt.set()                 # 表示事件完成

def waiter():
    '''After 3 seconds the get call will unblock'''
    print("I'll wait for you")
    evt.wait()                # 阻塞等待事件完成
    print("It's about time")

gevent.joinall([
    gevent.spawn(setter),
    gevent.spawn(waiter),
    gevent.spawn(waiter),
    gevent.spawn(waiter),
    gevent.spawn(waiter),
    gevent.spawn(waiter)
])

AsyncResult 可传值的事件

import gevent
from gevent.event import AsyncResult
a = AsyncResult()

def setter():
    gevent.sleep(3)
    a.set('Hello!')            # 事件传值

def waiter():
    """
    After 3 seconds the get call will unblock after the setter
    puts a value into the AsyncResult.
    """
    print(a.get())             # 获取时间值

gevent.joinall([
    gevent.spawn(setter),
    gevent.spawn(waiter),
])

队列

#/usr/local/python
#encoding:utf8
import gevent
from gevent.pool import Pool
from gevent.coros import BoundedSemaphore
from gevent.queue import Queue, Empty
import os

tasks = Queue(maxsize=30)         # 队列 超过30引发 gevent.hub.LoopExit
tasks1 = Queue()

def boss():
    print('放队列任务')
    for i in xrange(1,25):
        tasks.put(i)

def worker1(n):
    print(len(pool))
    while not tasks.empty():      # 判断队列是否为空
        task = tasks.get()        # 获取队列内容
        tasks1.put(os.popen('id').read())
        print('Worker %s got task %s' % (n, task))
        gevent.sleep(0)           # 放弃当前任务

def worker2(name):
    try:
        while True:
            task = tasks1.get(timeout=2)
            print('获取后释放:%s' % task)
            gevent.sleep(0)
    except Empty:                 # 等待超时报错完成
        print('Quitting time!')

gevent.spawn(boss).join()         # 执行单次协程任务

pool = Pool(5)                    # 协程池大小
pool.map(worker1, xrange(0,20))   # 通过map方法把多个任务分发给池中的5个协程

gevent.joinall([                  # 同时执行多个协程任务
    gevent.spawn(worker2, 'steve'),
    gevent.spawn(worker2, 'john'),
    gevent.spawn(worker2, 'nancy'),
])