The Algorithms logo
算法
关于我们捐赠

多级反馈队列

R
from collections import deque


class Process:
    def __init__(self, process_name: str, arrival_time: int, burst_time: int) -> None:
        self.process_name = process_name  # process name
        self.arrival_time = arrival_time  # arrival time of the process
        # completion time of finished process or last interrupted time
        self.stop_time = arrival_time
        self.burst_time = burst_time  # remaining burst time
        self.waiting_time = 0  # total time of the process wait in ready queue
        self.turnaround_time = 0  # time from arrival time to completion time


class MLFQ:
    """
    MLFQ(Multi Level Feedback Queue)
    https://en.wikipedia.org/wiki/Multilevel_feedback_queue
    MLFQ has a lot of queues that have different priority
    In this MLFQ,
    The first Queue(0) to last second Queue(N-2) of MLFQ have Round Robin Algorithm
    The last Queue(N-1) has First Come, First Served Algorithm
    """

    def __init__(
        self,
        number_of_queues: int,
        time_slices: list[int],
        queue: deque[Process],
        current_time: int,
    ) -> None:
        # total number of mlfq's queues
        self.number_of_queues = number_of_queues
        # time slice of queues that round robin algorithm applied
        self.time_slices = time_slices
        # unfinished process is in this ready_queue
        self.ready_queue = queue
        # current time
        self.current_time = current_time
        # finished process is in this sequence queue
        self.finish_queue: deque[Process] = deque()

    def calculate_sequence_of_finish_queue(self) -> list[str]:
        """
        This method returns the sequence of finished processes
        >>> P1 = Process("P1", 0, 53)
        >>> P2 = Process("P2", 0, 17)
        >>> P3 = Process("P3", 0, 68)
        >>> P4 = Process("P4", 0, 24)
        >>> mlfq = MLFQ(3, [17, 25], deque([P1, P2, P3, P4]), 0)
        >>> _ = mlfq.multi_level_feedback_queue()
        >>> mlfq.calculate_sequence_of_finish_queue()
        ['P2', 'P4', 'P1', 'P3']
        """
        sequence = []
        for i in range(len(self.finish_queue)):
            sequence.append(self.finish_queue[i].process_name)
        return sequence

    def calculate_waiting_time(self, queue: list[Process]) -> list[int]:
        """
        This method calculates waiting time of processes
        >>> P1 = Process("P1", 0, 53)
        >>> P2 = Process("P2", 0, 17)
        >>> P3 = Process("P3", 0, 68)
        >>> P4 = Process("P4", 0, 24)
        >>> mlfq = MLFQ(3, [17, 25], deque([P1, P2, P3, P4]), 0)
        >>> _ = mlfq.multi_level_feedback_queue()
        >>> mlfq.calculate_waiting_time([P1, P2, P3, P4])
        [83, 17, 94, 101]
        """
        waiting_times = []
        for i in range(len(queue)):
            waiting_times.append(queue[i].waiting_time)
        return waiting_times

    def calculate_turnaround_time(self, queue: list[Process]) -> list[int]:
        """
        This method calculates turnaround time of processes
        >>> P1 = Process("P1", 0, 53)
        >>> P2 = Process("P2", 0, 17)
        >>> P3 = Process("P3", 0, 68)
        >>> P4 = Process("P4", 0, 24)
        >>> mlfq = MLFQ(3, [17, 25], deque([P1, P2, P3, P4]), 0)
        >>> _ = mlfq.multi_level_feedback_queue()
        >>> mlfq.calculate_turnaround_time([P1, P2, P3, P4])
        [136, 34, 162, 125]
        """
        turnaround_times = []
        for i in range(len(queue)):
            turnaround_times.append(queue[i].turnaround_time)
        return turnaround_times

    def calculate_completion_time(self, queue: list[Process]) -> list[int]:
        """
        This method calculates completion time of processes
        >>> P1 = Process("P1", 0, 53)
        >>> P2 = Process("P2", 0, 17)
        >>> P3 = Process("P3", 0, 68)
        >>> P4 = Process("P4", 0, 24)
        >>> mlfq = MLFQ(3, [17, 25], deque([P1, P2, P3, P4]), 0)
        >>> _ = mlfq.multi_level_feedback_queue()
        >>> mlfq.calculate_turnaround_time([P1, P2, P3, P4])
        [136, 34, 162, 125]
        """
        completion_times = []
        for i in range(len(queue)):
            completion_times.append(queue[i].stop_time)
        return completion_times

    def calculate_remaining_burst_time_of_processes(
        self, queue: deque[Process]
    ) -> list[int]:
        """
        This method calculate remaining burst time of processes
        >>> P1 = Process("P1", 0, 53)
        >>> P2 = Process("P2", 0, 17)
        >>> P3 = Process("P3", 0, 68)
        >>> P4 = Process("P4", 0, 24)
        >>> mlfq = MLFQ(3, [17, 25], deque([P1, P2, P3, P4]), 0)
        >>> finish_queue, ready_queue = mlfq.round_robin(deque([P1, P2, P3, P4]), 17)
        >>> mlfq.calculate_remaining_burst_time_of_processes(mlfq.finish_queue)
        [0]
        >>> mlfq.calculate_remaining_burst_time_of_processes(ready_queue)
        [36, 51, 7]
        >>> finish_queue, ready_queue = mlfq.round_robin(ready_queue, 25)
        >>> mlfq.calculate_remaining_burst_time_of_processes(mlfq.finish_queue)
        [0, 0]
        >>> mlfq.calculate_remaining_burst_time_of_processes(ready_queue)
        [11, 26]
        """
        return [q.burst_time for q in queue]

    def update_waiting_time(self, process: Process) -> int:
        """
        This method updates waiting times of unfinished processes
        >>> P1 = Process("P1", 0, 53)
        >>> P2 = Process("P2", 0, 17)
        >>> P3 = Process("P3", 0, 68)
        >>> P4 = Process("P4", 0, 24)
        >>> mlfq = MLFQ(3, [17, 25], deque([P1, P2, P3, P4]), 0)
        >>> mlfq.current_time = 10
        >>> P1.stop_time = 5
        >>> mlfq.update_waiting_time(P1)
        5
        """
        process.waiting_time += self.current_time - process.stop_time
        return process.waiting_time

    def first_come_first_served(self, ready_queue: deque[Process]) -> deque[Process]:
        """
        FCFS(First Come, First Served)
        FCFS will be applied to MLFQ's last queue
        A first came process will be finished at first
        >>> P1 = Process("P1", 0, 53)
        >>> P2 = Process("P2", 0, 17)
        >>> P3 = Process("P3", 0, 68)
        >>> P4 = Process("P4", 0, 24)
        >>> mlfq = MLFQ(3, [17, 25], deque([P1, P2, P3, P4]), 0)
        >>> _ = mlfq.first_come_first_served(mlfq.ready_queue)
        >>> mlfq.calculate_sequence_of_finish_queue()
        ['P1', 'P2', 'P3', 'P4']
        """
        finished: deque[Process] = deque()  # sequence deque of finished process
        while len(ready_queue) != 0:
            cp = ready_queue.popleft()  # current process

            # if process's arrival time is later than current time, update current time
            if self.current_time < cp.arrival_time:
                self.current_time += cp.arrival_time

            # update waiting time of current process
            self.update_waiting_time(cp)
            # update current time
            self.current_time += cp.burst_time
            # finish the process and set the process's burst-time 0
            cp.burst_time = 0
            # set the process's turnaround time because it is finished
            cp.turnaround_time = self.current_time - cp.arrival_time
            # set the completion time
            cp.stop_time = self.current_time
            # add the process to queue that has finished queue
            finished.append(cp)

        self.finish_queue.extend(finished)  # add finished process to finish queue
        # FCFS will finish all remaining processes
        return finished

    def round_robin(
        self, ready_queue: deque[Process], time_slice: int
    ) -> tuple[deque[Process], deque[Process]]:
        """
        RR(Round Robin)
        RR will be applied to MLFQ's all queues except last queue
        All processes can't use CPU for time more than time_slice
        If the process consume CPU up to time_slice, it will go back to ready queue
        >>> P1 = Process("P1", 0, 53)
        >>> P2 = Process("P2", 0, 17)
        >>> P3 = Process("P3", 0, 68)
        >>> P4 = Process("P4", 0, 24)
        >>> mlfq = MLFQ(3, [17, 25], deque([P1, P2, P3, P4]), 0)
        >>> finish_queue, ready_queue = mlfq.round_robin(mlfq.ready_queue, 17)
        >>> mlfq.calculate_sequence_of_finish_queue()
        ['P2']
        """
        finished: deque[Process] = deque()  # sequence deque of terminated process
        # just for 1 cycle and unfinished processes will go back to queue
        for _ in range(len(ready_queue)):
            cp = ready_queue.popleft()  # current process

            # if process's arrival time is later than current time, update current time
            if self.current_time < cp.arrival_time:
                self.current_time += cp.arrival_time

            # update waiting time of unfinished processes
            self.update_waiting_time(cp)
            # if the burst time of process is bigger than time-slice
            if cp.burst_time > time_slice:
                # use CPU for only time-slice
                self.current_time += time_slice
                # update remaining burst time
                cp.burst_time -= time_slice
                # update end point time
                cp.stop_time = self.current_time
                # locate the process behind the queue because it is not finished
                ready_queue.append(cp)
            else:
                # use CPU for remaining burst time
                self.current_time += cp.burst_time
                # set burst time 0 because the process is finished
                cp.burst_time = 0
                # set the finish time
                cp.stop_time = self.current_time
                # update the process' turnaround time because it is finished
                cp.turnaround_time = self.current_time - cp.arrival_time
                # add the process to queue that has finished queue
                finished.append(cp)

        self.finish_queue.extend(finished)  # add finished process to finish queue
        # return finished processes queue and remaining processes queue
        return finished, ready_queue

    def multi_level_feedback_queue(self) -> deque[Process]:
        """
        MLFQ(Multi Level Feedback Queue)
        >>> P1 = Process("P1", 0, 53)
        >>> P2 = Process("P2", 0, 17)
        >>> P3 = Process("P3", 0, 68)
        >>> P4 = Process("P4", 0, 24)
        >>> mlfq = MLFQ(3, [17, 25], deque([P1, P2, P3, P4]), 0)
        >>> finish_queue = mlfq.multi_level_feedback_queue()
        >>> mlfq.calculate_sequence_of_finish_queue()
        ['P2', 'P4', 'P1', 'P3']
        """

        #  all queues except last one have round_robin algorithm
        for i in range(self.number_of_queues - 1):
            finished, self.ready_queue = self.round_robin(
                self.ready_queue, self.time_slices[i]
            )
        #  the last queue has first_come_first_served algorithm
        self.first_come_first_served(self.ready_queue)

        return self.finish_queue


if __name__ == "__main__":
    import doctest

    P1 = Process("P1", 0, 53)
    P2 = Process("P2", 0, 17)
    P3 = Process("P3", 0, 68)
    P4 = Process("P4", 0, 24)
    number_of_queues = 3
    time_slices = [17, 25]
    queue = deque([P1, P2, P3, P4])

    if len(time_slices) != number_of_queues - 1:
        raise SystemExit(0)

    doctest.testmod(extraglobs={"queue": deque([P1, P2, P3, P4])})

    P1 = Process("P1", 0, 53)
    P2 = Process("P2", 0, 17)
    P3 = Process("P3", 0, 68)
    P4 = Process("P4", 0, 24)
    number_of_queues = 3
    time_slices = [17, 25]
    queue = deque([P1, P2, P3, P4])
    mlfq = MLFQ(number_of_queues, time_slices, queue, 0)
    finish_queue = mlfq.multi_level_feedback_queue()

    # print total waiting times of processes(P1, P2, P3, P4)
    print(
        f"waiting time:\
        \t\t\t{MLFQ.calculate_waiting_time(mlfq, [P1, P2, P3, P4])}"
    )
    # print completion times of processes(P1, P2, P3, P4)
    print(
        f"completion time:\
        \t\t{MLFQ.calculate_completion_time(mlfq, [P1, P2, P3, P4])}"
    )
    # print total turnaround times of processes(P1, P2, P3, P4)
    print(
        f"turnaround time:\
        \t\t{MLFQ.calculate_turnaround_time(mlfq, [P1, P2, P3, P4])}"
    )
    # print sequence of finished processes
    print(
        f"sequence of finished processes:\
        {mlfq.calculate_sequence_of_finish_queue()}"
    )