【Windows原理】异步IO-完成端口(IOCP)

// 08.异步IO_完成端口(IOCP)-复习.cpp : 定义控制台应用程序的入口点。
//


// 同步IO的缺点是, 在读写文件时, 如果文件太大, 或者读写的时间太长, 就会在读写函数中
// 阻塞住.  
// 异步IO解决了这个问题, 异步IO读写文件时, 文件再大也不会阻塞住
// 但是异步IO要完成这样的特性是有一点付出的
// 异步读写文件后, 需要通过一些方式才能知道文件读写(IO任务)什么时候完成. 
// 这里讲的是第四个方式, 通过完成端口来处理已完成的IO任务
// 
// 事件对象和可提醒IO已经能够让我们及时处理完成的IO任务.但是这两种办法并不是最高效率
// 的. 事件对象模型中 , 每处理一个IO任务就需要创建一个线程.这就好比一个银行 , 每处
// 理一个客户就新建一个窗口.这是非常不合理的.线程的创建和销毁会耗费系统非常多的资源, 
// 而且线程多了 , 就会产生线程切换.这样一来 , 线程越多 , 效率反而越低.
// 可提醒IO虽然好用 , 但如果IO任务多了, 那么.线程的APC队列中的元素就会变多 , APC队
// 列中函数的调用无疑会将整个线程都拖慢.
// 完成端口就是一种综合了两者的优点 , 避开了两者的缺点的优秀IO通知模型.
// 完成端口的创建需要两步,这两步都是在调用同一个函数:CreateIoCompletionPort
// 在第二次调用CreateIoCompletionPort 后( 将文件对象和完成端口对象进行关联后 ) , 
// 所有发生在文件对象上的IO任务都会被系统发送到完成端口的队列中.
// 比如 , 当我们ReadFile 一个文件对象时,在使用事件对象模型时,系统会在IO任务完成后
// 触发OVERLAPPED 中的hEvent 事件对象.我们使用WaitForSignalObject 函数进行等待
// 的线程会被唤醒.
// 在使用可提醒IO的完成函数中 ,系统会在IO任务完成后 ,将完成函数发送到线程的APC队列.
// APC队列会在线程挂起时会被调用 ,我们的完成函数也就是在APC队列中的函数被调用时被调
// 用的.并且在调用函数时 , 将OVERLAPPED 结构体变量的地址通过形参传递进函数.
// 在使用完成端口时,系统会检测我们的文件句柄有没有和完成端口进行关联.如果是有关联的, 
// 那么系统会在IO任务完成后 , 将OVERLAPPED结构体和一些其它信息发送到完成端口的队列
// 中( 可以使用PostQueuedCompetionStatus 函数来发送 ).当我们在一个线程中使用
// GetQueuedCompletionStatus 时 , 这个函数实际上就是在等待完成端口中的队列是否有
// 值 , 如果没有值 , 那么调用这个函数的线程会被挂起.如果队列中有值了 , 这个线程会被
// 唤醒 , 并且GetQueuedCompletionStatus 函数会将已经完成的IO任务的OVERLAPPED 
// 结构体变量的地址获取出来.
// 至于GetQueuedCompletionStatus 的调用地点 , 就必须是在线程当中 , 因为这个函数会
// 因为完成队列中没有值时将线程挂起.
// 而调用GetQueuedCompletionStatus 函数的线程如果只有一个 , 那么完成端口的效率并不
// 会太好.因为完成队列中如果同时保存了几百个值 , 那么只有一个线程在调用
// GetQueuedCompletionStatus 函数时 , 线程就会将队列中的值一个接着一个地进行处理.
// 但如果调用GetQueuedCompletionStatus 函数的线程有十个,当完成队列中存在几百个值, 
// 那么十个线程会同时从完成队列中取出值来进行处理 , 这样一来 , 就能每次处理10个.
// 但是前面说了 , 线程并非越多越好.线程多了 , 反而可能将效率拖得更低.微软官方的文档有
// 说过 , 线程的个数 , 最好是CPU个数的2倍.这是能够达到最优效率的线程个数了.
// 
#include "stdafx.h"
#include <windows.h>
#include <process.h>

struct MyOVERLAPPED:public OVERLAPPED
{
public:	
	char * pBuff; 
	int nIndex;
	MyOVERLAPPED();
	MyOVERLAPPED(int nIoSIze, int nFileOffsetLow, int nFileOffsetHight = 0);
	~MyOVERLAPPED();
};

// 等待完成端口通知线程
unsigned int __stdcall WaitComplePortNotify(void * pArg);

int _tmain(int argc, _TCHAR* argv[])
{
	HANDLE hFile = CreateFile(L"1.txt",GENERIC_READ,FILE_SHARE_READ,NULL,OPEN_EXISTING,FILE_ATTRIBUTE_NORMAL|FILE_FLAG_OVERLAPPED,NULL); // 使用异步IO

	if (hFile == INVALID_HANDLE_VALUE)return 0;

	// 获取系统信息(主要是获取处理器个数)
	SYSTEM_INFO si = { 0 };
	GetSystemInfo(&si);

	// 创建完成端口
	HANDLE hComplePort = CreateIoCompletionPort(
		INVALID_HANDLE_VALUE,// 创建完成端口时,不用传递设备对象句柄
		NULL,// 已经创建完成端口,穿件完成端口时,需要填NULL
		0,// 创建完成端口时,不用传
		si.dwNumberOfProcessors // 处理器数量,传0时使用默认的个数
		);

	// 将文件对象和完成端口进行管理
	CreateIoCompletionPort(
		hFile, // 要关联的设备对象
		hComplePort, // 要进行关联的完成端口
		1, // 完成键
		0 // 进行关联时不用传
		);

	// 上面两部可以通过下面一次调用完成
	/*HANDLE hComplePort = CreateIoCompletionPort(hFile, NULL, 1, si.dwNumberOfProcessors);*/

	// 创建等待完成端口通知的线程,可以只创建1个线程,
	// 为了更高的效率,这里创建了CPU个数2倍的线程
	for (int i = 0; i < si.dwNumberOfProcessors * 2;++i)
	{
		_beginthreadex(0, 0, WaitComplePortNotify, (void*)hComplePort, 0, 0);
	}


	// 1. 投递的任务完成后,会被系统添加到完成端口的完成队列里
	// 2. 完成端口等待线程队列有成员,并且已释放线程列表(正在忙的线程)
	// 没有达到完成端口允许运行的最大数量
	// 3. 从完成队列里取1个完成任务,从等待线程队列里取最上层线程,添加到已释放线程队列里,等待任务被处理,处理完后,从已释放线程列表跑到等待线程列表
	// 

	// 开始投递IO任务,通过对应的事件对象知道释放完成IO
	MyOVERLAPPED * v1 = new MyOVERLAPPED(100, 0);
	v1->nIndex = 1;
	ReadFile(hFile, v1->pBuff, 100, NULL, v1);

	MyOVERLAPPED * v2 = new MyOVERLAPPED(100, 200);
	v2->nIndex = 2;
	ReadFile(hFile, v2->pBuff, 100, NULL, v2);

	MyOVERLAPPED * v3 = new MyOVERLAPPED(100, 400);
	v3->nIndex = 3;
	ReadFile(hFile, v3->pBuff, 100, NULL, v3);

	MyOVERLAPPED * v4 = new MyOVERLAPPED(100, 600);
	v4->nIndex = 4;
	ReadFile(hFile, v4->pBuff, 100, NULL, v4);

	system("pause");

	// 两种退出所有和完成观看关联的线程的方法
	// 将一个IO完成状态发送到完成端口
	PostQueuedCompletionStatus(
		hComplePort,// 接收完成状态的完成端口句柄
		0, // 完成了多少个字节
		0, // 完成键
		0  // 重叠结构体变量地址
		);

	// 或直接关闭完成端口句柄
	CloseHandle(hComplePort);
	// 这时候线程内的PostQueuedCompletionStatus函数会返回false。
	// GetLastError会得到ERROR_INVALID_HANDLE
	// 线程就直达自己可以退出了

	system("pause");
	
	return 0;
}


MyOVERLAPPED::MyOVERLAPPED() :pBuff(nullptr)
{
	OVERLAPPED::hEvent = 0;
}

// 构造函数
MyOVERLAPPED::MyOVERLAPPED(int nIoSize, int nFileOffsetLow, int nFileOffsetHight)
{
	OVERLAPPED::hEvent = CreateEvent(NULL, FALSE, FALSE, NULL);

	OVERLAPPED::Offset = nFileOffsetLow;
	OVERLAPPED::OffsetHigh = nFileOffsetHight;
	pBuff = new char[nIoSize];
}

MyOVERLAPPED::~MyOVERLAPPED()
{
	if (pBuff != nullptr)
	{
		delete[] pBuff;
		pBuff = nullptr;
	}

	if (hEvent != NULL)
	{
		CloseHandle(hEvent);
	}
}


unsigned int __stdcall WaitComplePortNotify(void * pArg)
{
	HANDLE hComplePort = (HANDLE)pArg;
	DWORD dwCompleSize = 0; // IO 任务完成的字节数
	DWORD dwCompleKey = 0; // 完成键,在这里没啥用
	MyOVERLAPPED * pMyOverlapped = NULL;
	BOOL bRet = FALSE;

	while (true)
	{
		// 等待IO任务被投递到完成队列
		bRet = GetQueuedCompletionStatus(hComplePort, &dwCompleSize, &dwCompleKey, (OVERLAPPED**)&pMyOverlapped,
			INFINITE);
		// 通知线程退出信息
		if (dwCompleKey == 0)
		{
			// 完成读写,可以关闭所有线程了
			// 将一个IO完成状态发送到完成端口
			PostQueuedCompletionStatus(hComplePort, 0, 0, 0);
			printf("线程退出");
			return 0;
		}

		// 判断是否真的获取到了已经完成的IO任务
		if (bRet == TRUE && pMyOverlapped != NULL)
		{
			printf("[%d]IO任务完成,读取字节:%d 读取位置:%d,读取内容[%s]\n",pMyOverlapped->nIndex,pMyOverlapped->InternalHigh,pMyOverlapped->Offset,pMyOverlapped->pBuff);

		}
	}
}

 

文 / luna
LEAVE A REPLY

loading