• 主页
  • 相册
  • 随笔
  • 目录
  • 存档
Total 244
Search AboutMe

  • 主页
  • 相册
  • 随笔
  • 目录
  • 存档

实验:读写者问题

2019-11-15

内容

  • c语言解决读写者问题

不规范警告:统一用临界区实现

1. 基本概念

同步

  • 并发进程(线程)按一定规则或顺序共享系统资源

  • 实现方法

    • 硬件同步
    • 信号量同步
    • 管程
    1
    2
    3
    4
    5
    6
    7
    8
    //在指定的时间内等待多个对象为可用状态
    // 阻塞队列
    DWORD WaitForMultipleObjects(
    DWORD nCount,
    const HANDLE *lpHandles,
    BOOL bWaitAll,
    DWORD dwMilliseconds
    );

    The WaitForMultipleObjects function determines whether the wait criteria have been met. If the criteria have not been met, the calling thread enters the wait state until the conditions of the wait criteria have been met or the time-out interval elapses.

    When bWaitAll is TRUE, the function’s wait operation is completed only when the states of all objects have been set to signaled. The function does not modify the states of the specified objects until the states of all objects have been set to signaled. For example, a mutex can be signaled, but the thread does not get ownership until the states of the other objects are also set to signaled. In the meantime, some other thread may get ownership of the mutex, thereby setting its state to nonsignaled.

    When bWaitAll is FALSE, this function checks the handles in the array in order starting with index 0, until one of the objects is signaled. If multiple objects become signaled, the function returns the index of the first handle in the array whose object was signaled.

1
2
3
4
DWORD WaitForSingleObject(
HANDLE hHandle, // A handle to the object
DWORD dwMilliseconds // The time-out interval, in milliseconds.
);

The WaitForSingleObject function checks the current state of the specified object. If the object’s state is nonsignaled, the calling thread enters the wait state until the object is signaled or the time-out interval elapses.

临界区(Critical Section)

  • 每个进程访问临界资源(共享资源)的那段代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    //对临界区对象进行初始化
    InitializeCriticalSection();

    // 等待占用临界区的使用权,得到使用权时返回
    // wait()
    EnterCriticalSection();

    // 释放临界区的使用权
    // signal()
    LeaveCriticalSection();

信号量(Semaphore)

  • 整型信号量:用于表示资源数目的整型量S,并定义P,V操作

    信号量是通过计数来对线程访问资源进行控制的,而实际上信号量确实也被称作Dijkstra计数器

    1
    2
    3
    4
    5
    6
    7
    8
    wait(S)
    {
    while(S<=0); // 忙等
    S--;
    signal(S)
    {
    S++;
    }
    1
    2
    3
    4
    5
    6
    //信号量对象的取值在0到指定最大值之间,用于限制并发访问的线程数
    //创建一个信号量对象
    CreateSemaphore();

    //释放对信号量对象的占用
    ReleaseSemaphore();
  • AND型信号量

    • AND型信号量集是指同时需要多个资源且每种占用一个资源时的信号量操作

    • 在一个原语中申请整段代码需要的多个临界资源,要么全部分配给它,要么一个都不分配给它

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      Swait(S1, S2, …, Sn)
      {
      if ( Si≥1 and … and Sn≥1
      {
      for (i =1;i<=n;i++)
      {
      Si--;
      }
      }
      else
      {
      //place the process in the waiting queue associated with the first Si found with Si<1, and set the program counter of this process to the beginning of Swait operation
      }
      }


      Ssignal(S1, S2, …, Sn)
      {
      for (i =1;i<=n;i++)
      {
      Si++;
      //Remove all the process waiting in the queue associated with Si into the ready queue.
      }
      }
    • 信号量机制

      • 一般情况:Swait(S1, t1, d1, …, Sn, tn, dn)。特殊情况👇
      • Swait(S, d, d)。 此时在信号量集中只有一 个信号量S, 但允许它每次申请d个资源,当现有资源数少于d时,不予分配
      • Swait(S, 1, 1)。 此时的信号量集已蜕化为 一般的记录型信号量(S>1时)或互斥信号量(S=1 时)。
      • Swait(S, 1, 0)。这是一种很特殊且很有用的信号量操作。当S≥1时,允许多个进程进入某特定区;当S变为0后,将阻止任何进程进入特定区。 换言之,它相当于一个可控开关

互斥量(锁)

  • 互斥:是指某一资源同时只允许一个访问者对其进行访问,具有唯一性和排它性。但互斥无法限制访问者对资源的访问顺序,即访问是无序的

  • 互斥信号量:$mutex\in(-1,0,1)$

    • 将临界资源CS置于wait(S)与signal(S)之间。当一个程序试图访问CS:
      • $mutex=1$,获得访问权
      • $mutex=0$,表示有一个进程进入临界区运行,另一个程序需要挂起,挂入阻塞队列
      • $mutex=-1$,表示有一个进程正在临界区运行的,另外一个进程因等待而阻塞在信号量队列中,需要被当前已在临界区运行的进程退出时唤醒
    1
    2
    3
    4
    //创建一个互斥对象,返回对象句柄
    CreateMutex();
    //释放对互斥对象的占用,使之成为可用
    ReleaseMutex();
    • Remark:一般当作$mutex\in(0,1)$

2. 区分与联系

  • 信号量与互斥量
    • 互斥量用于线程的互斥,信号线用于线程的同步
    • 互斥量值只能为0/1,信号量值可以为非负整数
    • 互斥量的加锁和解锁必须由同一线程分别对应使用,信号量可以由一个线程释放,另一个线程得到
  • 临界区与互斥量
    • 互斥量与临界区的作用非常相似,但互斥量是可以命名的,也就是说它可以跨越进程使用,所以创建互斥量需要的资源更多
  • 互斥量与信号量与事件(Event)
    • 都可以被跨越进程使用来进行同步数据操作,而其他的对象与数据同步操作无关

3. 读写者

  • 写写互斥
  • 读写互斥
  • 读读同时

控制

  • readcount:读进程计数
  • rmutex:修改readcount的互斥量
  • wmutex:是否允许写的信号量

4. 实现

读写者问题本应用信号量与互斥量,但本文统一运用临界区实现<_<

当然你可以很轻易地 改为正确的

以下给出一个辅助示例👇

5. 生产者-消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
CRITICAL_SECTION sem_mutex; 
HANDLE sem_full;
HANDLE sem_avail;

void Thread_Producter(void *p)
{

DWORD m_delay;
DWORD m_persist;
int m_serial;

//读参数
m_serial = ((ThreadInfo*)(p))->serial;
m_delay = (DWORD)(((ThreadInfo*)(p))->delay*INTE_PER_SEC);
m_persist = (DWORD)(((ThreadInfo*)(p))->persist*INTE_PER_SEC);
while (true)
{
Sleep(m_delay);
printf("P thread %d send the P require\n",m_serial);
WaitForSingleObject(sem_avail,INFINITE);
EnterCriticalSection(&sem_mutex);
printf("P thread %d Begin to P\n",m_serial);
Sleep(m_persist);
printf("P thread %d Finish P.\n",m_serial);
ReleaseSemaphore(sem_full,1,NULL);
LeaveCriticalSection(&sem_mutex);
}

}

6. 读写者

测试数据rw_data:

1
2
3
4
5
6
7
1 R 1 6
2 R 2 5
3 R 3 4
4 R 4 3
5 R 5 2
6 W 2 1
7 W 3 2

c++实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
#include"windows.h"
#include<conio.h>
#include<stdlib.h>
#include<fstream>
#include<io.h>
#include<string.h>
#include<stdio.h>

#define INTE_PER_SEC 100
#define MAX_THREAD_NUM 64
#define SEMMAX_FULL 64
#define READER 'R'
#define WRITER 'W'

using namespace std;

struct ThreadInfo
{
int serial; //序号
char entity;//动作实体
double delay;//延迟
double persist;//持续
};

CRITICAL_SECTION w_mutex, r_mutex;

int readcount = 0;

void Initialize(const char* file);
void Thread_Read(void* p);
void Thread_Write(void* p);

void Initialize(const char* file)
{
DWORD n_thread = 0;
DWORD thread_ID;

HANDLE h_Thread[MAX_THREAD_NUM];
ThreadInfo thread_info[MAX_THREAD_NUM];

ifstream in;
in.open(file);
puts("Read Data File \n");
while (in)
{
in >> thread_info[n_thread].serial;
in >> thread_info[n_thread].entity;
in >> thread_info[n_thread].delay;
in >> thread_info[n_thread].persist;
n_thread++;
in.get();
}

for (int i = 0; i < (int)(n_thread); i++)
{
if (thread_info[i].entity == READER)
h_Thread[i] = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)(Thread_Read), &thread_info[i], 0, &thread_ID);
else if (thread_info[i].entity == WRITER)
h_Thread[i] = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)(Thread_Write), &thread_info[i], 0, &thread_ID);
else
{
puts("Bad File\n");
exit(0);
}
}

InitializeCriticalSection(&r_mutex);
InitializeCriticalSection(&w_mutex);
WaitForMultipleObjects(n_thread, h_Thread, TRUE, -1);
printf("Task is Finished!\n");
}
void Thread_Read(void* p)
{
int m_serial = ((ThreadInfo*)(p))->serial;
DWORD m_delay = (DWORD)(((ThreadInfo*)(p))->delay);
DWORD m_persist = (DWORD)(((ThreadInfo*)(p))->persist);
while (TRUE)
{
Sleep(m_delay);
printf("R thread %d send the R require\n", m_serial);
EnterCriticalSection(&r_mutex);
if (readcount == 0)
{
EnterCriticalSection(&w_mutex);

}
printf("R thread %d Begin to Read\n", m_serial);
readcount++;
LeaveCriticalSection(&r_mutex);

/* perform read operation */
Sleep(m_persist);
printf("R thread %d finish Reading\n", m_serial);

EnterCriticalSection(&r_mutex);
readcount--;
if (readcount == 0)
{
LeaveCriticalSection(&w_mutex);
}
LeaveCriticalSection(&r_mutex);
}

}

void Thread_Write(void* p)
{
int m_serial = ((ThreadInfo*)(p))->serial;
DWORD m_delay = (DWORD)(((ThreadInfo*)(p))->delay);
DWORD m_persist = (DWORD)(((ThreadInfo*)(p))->persist);
while (TRUE)
{
Sleep(m_delay);
printf("W thread %d send the W require\n", m_serial);
EnterCriticalSection(&w_mutex);
printf("W thread %d Begin to Write\n", m_serial);

/* perform read operation */
Sleep(m_persist);
printf("W thread %d finish Writing\n", m_serial);

LeaveCriticalSection(&w_mutex);
}

}
int main()
{
Initialize("rw_data.txt");

}

7. 结果展示

  • 读者重复读

  • 读写互斥

  • 写写互斥

8. 常见问题解决

  • 将读线程的m_persist设得过大,会出现写饿死

  • 无限循环

    对于进程和线程来讲,如果进程和线程在运行状态则为无信号状态(nonsignaled),在退出后为有信号状态(signaled)

    • 对任一线程对象,通过SetEvent()或执行结束来标记signaled。而在本例中,由于While(True)使得任一线程都无法标记signaled。

    • 因此,可以通过注释While(True),来使每一线程只执行一次。

9. 进阶

基于以上,进一步实现读写者问题的写者优先

10. 原理

何为“优先”

  • 当优先级低的进程获得临界区时,高优先级进程能从低优先级进程中抢得临界区的访问权

  • 当优先级高的进程在访问临界区时,低优先级进程必须等待,直到高优先级全部访问完才有机会访问临界区

临界区

rcnt、wcnt:读、写者队列挂起数目(等待进程数)

re:读者写者同步、写者优先

fi:写写互斥

rmutex:rcnt互斥

wmutex:wcnt互斥

11. 代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
/*
*Target:读写者问题的写者优先实现
*Author:github:@purplezi
*Date:2019-11-22
*/
int wcnt = 0, rcnt = 0; // 记录读者和写者的数量
CRITICAL_SECTION rmutex, wmutex; // 用于读写者互斥修改rcnt和wcnt
CRITICAL_SECTION fi; // 用于读写者互斥访问文件
CRITICAL_SECTION re; // 用于阻塞读者读


void Reader_Writer(char* file) {
DWORD n_thread = 0;
DWORD thread_ID;

HANDLE h_thread[MAX_THREAD_NUM];
ThreadInfo thread_info[MAX_THREAD_NUM];

fstream InFile;
InFile.open(file);
puts("Read Data File\n");

while (InFile) {
InFile >> thread_info[n_thread].serial;
InFile >> thread_info[n_thread].entity;
InFile >> thread_info[n_thread].delay;
InFile >> thread_info[n_thread++].persist;
InFile.get();
}

for (int i = 0; i < (int)n_thread; i++) {
if (thread_info[i].entity == Writer) {
h_thread[i] = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)(Thread_Writer), &thread_info[i], 0, &thread_ID);
}
else if (thread_info[i].entity == Reader) {
h_thread[i] = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)(Thread_Reader), &thread_info[i], 0, &thread_ID);
}
else {
puts("Bad File\n");
exit(0);
}
}

InitializeCriticalSection(&rmutex);
InitializeCriticalSection(&wmutex);
InitializeCriticalSection(&fi);
InitializeCriticalSection(&re);

WaitForMultipleObjects(n_thread, h_thread, TRUE, -1);
printf("Task is finished!\n");
_getch();

}
void Thread_Reader(void* p) {
int r_serial, r_delay, r_persist;

r_serial = ((ThreadInfo*)(p))->serial;
r_delay = ((ThreadInfo*)(p))->delay;
r_persist = ((ThreadInfo*)(p))->persist;

while (true) {
Sleep(r_delay);
printf("R thread %d send a require\n", r_serial);
// 检查写者队列是否为空
EnterCriticalSection(&re);
//为空才能继续挂读者
EnterCriticalSection(&rmutex);
if (rcnt == 0)EnterCriticalSection(&fi);
rcnt++;

LeaveCriticalSection(&rmutex);

LeaveCriticalSection(&re);

printf("R thread %d begin to read\n", r_serial);
Sleep(r_persist);
printf("R thread %d finish!\n", r_serial);

EnterCriticalSection(&rmutex);
rcnt--;
if (rcnt == 0)LeaveCriticalSection(&fi);
LeaveCriticalSection(&rmutex);
}
}

void Thread_Writer(void* p) {
int w_serial, w_delay, w_persist;

w_serial = ((ThreadInfo*)(p))->serial;
w_delay = ((ThreadInfo*)(p))->delay;
w_persist = ((ThreadInfo*)(p))->persist;

while (true) {
Sleep(w_delay);
printf("W thread %d send a require\n", w_serial);

EnterCriticalSection(&wmutex);
if (wcnt == 0)EnterCriticalSection(&re);
wcnt++;
LeaveCriticalSection(&wmutex);

// 注意此处加了fi锁来控制
EnterCriticalSection(&fi);
printf("W thread %d begin to write\n", w_serial);
Sleep(w_persist);
printf("W thread %d finish!\n", w_serial);
LeaveCriticalSection(&fi);

EnterCriticalSection(&wmutex);
wcnt--;
// 如果wcnt不为0,说明还有写进程在等待序列,因此要优先写进程,不能放re锁
if (wcnt == 0)LeaveCriticalSection(&re);
LeaveCriticalSection(&wmutex);
}
}

特别鸣谢: purplezi

12. 参考资料

操作系统——读者写者问题详解 - 起风了… - CSDN博客

信号量和互斥量(锁)的区别 - fuqiaoyimeng的专栏 - CSDN博客

操作系统PV操作及读者写者问题 - Remoa的休闲茶馆 - CSDN博客

Critical Section,Mutex,Semaphore,Event区别 - shuixin536的专栏 - CSDN博客

  • Lab
  • Operating System
  • Lab
网络端口扫描
HTTP代理服务器实验
© 2024 何决云 载入天数...