Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
Z
Zope
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
Analytics
Analytics
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Commits
Issue Boards
Open sidebar
Kirill Smelkov
Zope
Commits
1d9f3e9e
Commit
1d9f3e9e
authored
Jan 18, 1999
by
Amos Latteier
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
added new medusa modules.
parent
4c454455
Changes
2
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
934 additions
and
0 deletions
+934
-0
ZServer/medusa/filesys.py
ZServer/medusa/filesys.py
+467
-0
lib/python/ZServer/medusa/filesys.py
lib/python/ZServer/medusa/filesys.py
+467
-0
No files found.
ZServer/medusa/filesys.py
0 → 100644
View file @
1d9f3e9e
# -*- Mode: Python; tab-width: 4 -*-
# $Id: filesys.py,v 1.1 1999/01/18 22:44:21 amos Exp $
# Author: Sam Rushing <rushing@nightmare.com>
#
# Generic filesystem interface.
#
# We want to provide a complete wrapper around any and all
# filesystem operations.
# this class is really just for documentation,
# identifying the API for a filesystem object.
# opening files for reading, and listing directories, should
# return a producer.
class
abstract_filesystem
:
def
__init__
(
self
):
pass
def
current_directory
(
self
):
"Return a string representing the current directory."
pass
def
listdir
(
self
,
path
,
long
=
0
):
"""Return a listing of the directory at 'path' The empty string
indicates the current directory. If 'long' is set, instead
return a list of (name, stat_info) tuples
"""
pass
def
open
(
self
,
path
,
mode
):
"Return an open file object"
pass
def
stat
(
self
,
path
):
"Return the equivalent of os.stat() on the given path."
pass
def
isdir
(
self
,
path
):
"Does the path represent a directory?"
pass
def
isfile
(
self
,
path
):
"Does the path represent a plain file?"
pass
def
cwd
(
self
,
path
):
"Change the working directory."
pass
def
cdup
(
self
):
"Change to the parent of the current directory."
pass
def
longify
(
self
,
path
):
"""Return a 'long' representation of the filename
[for the output of the LIST command]"""
pass
# standard wrapper around a unix-like filesystem, with a 'false root'
# capability.
# security considerations: can symbolic links be used to 'escape' the
# root? should we allow it? if not, then we could scan the
# filesystem on startup, but that would not help if they were added
# later. We will probably need to check for symlinks in the cwd method.
# what to do if wd is an invalid directory?
import
os
import
stat
import
string
def
safe_stat
(
path
):
try
:
return
(
path
,
os
.
stat
(
path
))
except
:
return
None
import
regex
import
regsub
import
glob
class
os_filesystem
:
path_module
=
os
.
path
# set this to zero if you want to disable pathname globbing.
# [we currently don't glob, anyway]
do_globbing
=
1
def
__init__
(
self
,
root
,
wd
=
'/'
):
self
.
root
=
root
self
.
wd
=
wd
def
current_directory
(
self
):
return
self
.
wd
def
isfile
(
self
,
path
):
p
=
self
.
normalize
(
self
.
path_module
.
join
(
self
.
wd
,
path
))
return
self
.
path_module
.
isfile
(
self
.
translate
(
p
))
def
isdir
(
self
,
path
):
p
=
self
.
normalize
(
self
.
path_module
.
join
(
self
.
wd
,
path
))
return
self
.
path_module
.
isdir
(
self
.
translate
(
p
))
def
cwd
(
self
,
path
):
p
=
self
.
normalize
(
self
.
path_module
.
join
(
self
.
wd
,
path
))
translated_path
=
self
.
translate
(
p
)
if
not
self
.
path_module
.
isdir
(
translated_path
):
return
0
else
:
old_dir
=
os
.
getcwd
()
# temporarily change to that directory, in order
# to see if we have permission to do so.
try
:
can
=
0
try
:
os
.
chdir
(
translated_path
)
can
=
1
self
.
wd
=
p
except
:
pass
finally
:
if
can
:
os
.
chdir
(
old_dir
)
return
can
def
cdup
(
self
):
return
self
.
cwd
(
'..'
)
def
listdir
(
self
,
path
,
long
=
0
):
p
=
self
.
translate
(
path
)
# I think we should glob, but limit it to the current
# directory only.
ld
=
os
.
listdir
(
p
)
if
not
long
:
return
list_producer
(
ld
,
0
,
None
)
else
:
old_dir
=
os
.
getcwd
()
try
:
os
.
chdir
(
p
)
# if os.stat fails we ignore that file.
result
=
filter
(
None
,
map
(
safe_stat
,
ld
))
finally
:
os
.
chdir
(
old_dir
)
return
list_producer
(
result
,
1
,
self
.
longify
)
# TODO: implement a cache w/timeout for stat()
def
stat
(
self
,
path
):
p
=
self
.
translate
(
path
)
return
os
.
stat
(
p
)
def
open
(
self
,
path
,
mode
):
p
=
self
.
translate
(
path
)
return
open
(
p
,
mode
)
def
unlink
(
self
,
path
):
p
=
self
.
translate
(
path
)
return
os
.
unlink
(
p
)
def
mkdir
(
self
,
path
):
p
=
self
.
translate
(
path
)
return
os
.
mkdir
(
p
)
def
rmdir
(
self
,
path
):
p
=
self
.
translate
(
path
)
return
os
.
rmdir
(
p
)
# utility methods
def
normalize
(
self
,
path
):
# watch for the ever-sneaky '/+' path element
path
=
regsub
.
gsub
(
'/+'
,
'/'
,
path
)
p
=
self
.
path_module
.
normpath
(
path
)
# remove 'dangling' cdup's.
if
len
(
p
)
>
2
and
p
[:
3
]
==
'/..'
:
p
=
'/'
return
p
def
translate
(
self
,
path
):
# we need to join together three separate
# path components, and do it safely.
# <real_root>/<current_directory>/<path>
# use the operating system's path separator.
path
=
string
.
join
(
string
.
split
(
path
,
'/'
),
os
.
sep
)
p
=
self
.
normalize
(
self
.
path_module
.
join
(
self
.
wd
,
path
))
p
=
self
.
normalize
(
self
.
path_module
.
join
(
self
.
root
,
p
[
1
:]))
return
p
def
longify
(
self
,
(
path
,
stat_info
)):
return
unix_longify
(
path
,
stat_info
)
def
__repr__
(
self
):
return
'<unix-style fs root:%s wd:%s>'
%
(
self
.
root
,
self
.
wd
)
if
os
.
name
==
'posix'
:
class
unix_filesystem
(
os_filesystem
):
pass
class
schizophrenic_unix_filesystem
(
os_filesystem
):
PROCESS_UID
=
os
.
getuid
()
PROCESS_EUID
=
os
.
geteuid
()
PROCESS_GID
=
os
.
getgid
()
PROCESS_EGID
=
os
.
getegid
()
def
__init__
(
self
,
root
,
wd
=
'/'
,
persona
=
(
None
,
None
)):
os_filesystem
.
__init__
(
self
,
root
,
wd
)
self
.
persona
=
persona
def
become_persona
(
self
):
if
self
.
persona
is
not
(
None
,
None
):
uid
,
gid
=
self
.
persona
# the order of these is important!
os
.
setegid
(
gid
)
os
.
seteuid
(
uid
)
def
become_nobody
(
self
):
if
self
.
persona
is
not
(
None
,
None
):
os
.
seteuid
(
self
.
PROCESS_UID
)
os
.
setegid
(
self
.
PROCESS_GID
)
# cwd, cdup, open, listdir
def
cwd
(
self
,
path
):
try
:
self
.
become_persona
()
return
os_filesystem
.
cwd
(
self
,
path
)
finally
:
self
.
become_nobody
()
def
cdup
(
self
,
path
):
try
:
self
.
become_persona
()
return
os_filesystem
.
cdup
(
self
)
finally
:
self
.
become_nobody
()
def
open
(
self
,
filename
,
mode
):
try
:
self
.
become_persona
()
return
os_filesystem
.
open
(
self
,
filename
,
mode
)
finally
:
self
.
become_nobody
()
def
listdir
(
self
,
path
,
long
=
0
):
try
:
self
.
become_persona
()
return
os_filesystem
.
listdir
(
self
,
path
,
long
)
finally
:
self
.
become_nobody
()
# This hasn't been very reliable across different platforms.
# maybe think about a separate 'directory server'.
#
# import posixpath
# import fcntl
# import FCNTL
# import select
# import asyncore
#
# # pipes /bin/ls for directory listings.
# class unix_filesystem (os_filesystem):
# pass
# path_module = posixpath
#
# def listdir (self, path, long=0):
# p = self.translate (path)
# if not long:
# return list_producer (os.listdir (p), 0, None)
# else:
# command = '/bin/ls -l %s' % p
# print 'opening pipe to "%s"' % command
# fd = os.popen (command, 'rt')
# return pipe_channel (fd)
#
# # this is both a dispatcher, _and_ a producer
# class pipe_channel (asyncore.file_dispatcher):
# buffer_size = 4096
#
# def __init__ (self, fd):
# asyncore.file_dispatcher.__init__ (self, fd)
# self.fd = fd
# self.done = 0
# self.data = ''
#
# def handle_read (self):
# if len (self.data) < self.buffer_size:
# self.data = self.data + self.fd.read (self.buffer_size)
# #print '%s.handle_read() => len(self.data) == %d' % (self, len(self.data))
#
# def handle_expt (self):
# #print '%s.handle_expt()' % self
# self.done = 1
#
# def ready (self):
# #print '%s.ready() => %d' % (self, len(self.data))
# return ((len (self.data) > 0) or self.done)
#
# def more (self):
# if self.data:
# r = self.data
# self.data = ''
# elif self.done:
# self.close()
# self.downstream.finished()
# r = ''
# else:
# r = None
# #print '%s.more() => %s' % (self, (r and len(r)))
# return r
# For the 'real' root, we could obtain a list of drives, and then
# use that. Doesn't win32 provide such a 'real' filesystem?
# [yes, I think something like this "\\.\c\windows"]
class
msdos_filesystem
(
os_filesystem
):
def
longify
(
self
,
(
path
,
stat_info
)):
return
msdos_longify
(
path
,
stat_info
)
# A merged filesystem will let you plug other filesystems together.
# We really need the equivalent of a 'mount' capability - this seems
# to be the most general idea. So you'd use a 'mount' method to place
# another filesystem somewhere in the hierarchy.
# Note: this is most likely how I will handle ~user directories
# with the http server.
class
merged_filesystem
:
def
__init__
(
self
,
*
fsys
):
pass
# this matches the output of NT's ftp server (when in
# MSDOS mode) exactly.
def
msdos_longify
(
file
,
stat_info
):
if
stat
.
S_ISDIR
(
stat_info
[
stat
.
ST_MODE
]):
dir
=
'<DIR>'
else
:
dir
=
' '
date
=
msdos_date
(
stat_info
[
stat
.
ST_MTIME
])
return
'%s %s %8d %s'
%
(
date
,
dir
,
stat_info
[
stat
.
ST_SIZE
],
file
)
def
msdos_date
(
t
):
try
:
info
=
time
.
gmtime
(
t
)
except
:
info
=
time
.
gmtime
(
0
)
# year, month, day, hour, minute, second, ...
if
info
[
3
]
>
11
:
merid
=
'PM'
info
[
3
]
=
info
[
3
]
-
12
else
:
merid
=
'AM'
return
'%02d-%02d-%02d %02d:%02d%s'
%
(
info
[
1
],
info
[
2
],
info
[
0
]
%
100
,
info
[
3
],
info
[
4
],
merid
)
months
=
[
'Jan'
,
'Feb'
,
'Mar'
,
'Apr'
,
'May'
,
'Jun'
,
'Jul'
,
'Aug'
,
'Sep'
,
'Oct'
,
'Nov'
,
'Dec'
]
mode_table
=
{
'0'
:
'---'
,
'1'
:
'--x'
,
'2'
:
'-w-'
,
'3'
:
'-wx'
,
'4'
:
'r--'
,
'5'
:
'r-x'
,
'6'
:
'rw-'
,
'7'
:
'rwx'
}
import
time
def
unix_longify
(
file
,
stat_info
):
# for now, only pay attention to the lower bits
mode
=
(
'%o'
%
stat_info
[
stat
.
ST_MODE
])[
-
3
:]
mode
=
string
.
join
(
map
(
lambda
x
:
mode_table
[
x
],
mode
),
''
)
if
stat
.
S_ISDIR
(
stat_info
[
stat
.
ST_MODE
]):
dirchar
=
'd'
else
:
dirchar
=
'-'
date
=
ls_date
(
long
(
time
.
time
()),
stat_info
[
stat
.
ST_MTIME
])
return
'%s%s %3d %-8d %-8d %8d %s %s'
%
(
dirchar
,
mode
,
stat_info
[
stat
.
ST_NLINK
],
stat_info
[
stat
.
ST_UID
],
stat_info
[
stat
.
ST_GID
],
stat_info
[
stat
.
ST_SIZE
],
date
,
file
)
# Emulate the unix 'ls' command's date field.
# it has two formats - if the date is more than 180
# days in the past, then it's like this:
# Oct 19 1995
# otherwise, it looks like this:
# Oct 19 17:33
def
ls_date
(
now
,
t
):
try
:
info
=
time
.
gmtime
(
t
)
except
:
info
=
time
.
gmtime
(
0
)
# 15,600,000 == 86,400 * 180
if
(
now
-
t
)
>
15600000
:
return
'%s %2d %d'
%
(
months
[
info
[
1
]
-
1
],
info
[
2
],
info
[
0
]
)
else
:
return
'%s %2d %02d:%02d'
%
(
months
[
info
[
1
]
-
1
],
info
[
2
],
info
[
3
],
info
[
4
]
)
# ===========================================================================
# Producers
# ===========================================================================
class
list_producer
:
def
__init__
(
self
,
file_list
,
long
,
longify
):
self
.
file_list
=
file_list
self
.
long
=
long
self
.
longify
=
longify
self
.
done
=
0
def
ready
(
self
):
if
len
(
self
.
file_list
):
return
1
else
:
if
not
self
.
done
:
self
.
done
=
1
return
0
return
(
len
(
self
.
file_list
)
>
0
)
# this should do a pushd/popd
def
more
(
self
):
if
not
self
.
file_list
:
return
''
else
:
# do a few at a time
bunch
=
self
.
file_list
[:
50
]
if
self
.
long
:
bunch
=
map
(
self
.
longify
,
bunch
)
self
.
file_list
=
self
.
file_list
[
50
:]
return
string
.
joinfields
(
bunch
,
'
\
r
\
n
'
)
+
'
\
r
\
n
'
lib/python/ZServer/medusa/filesys.py
0 → 100644
View file @
1d9f3e9e
# -*- Mode: Python; tab-width: 4 -*-
# $Id: filesys.py,v 1.1 1999/01/18 22:44:21 amos Exp $
# Author: Sam Rushing <rushing@nightmare.com>
#
# Generic filesystem interface.
#
# We want to provide a complete wrapper around any and all
# filesystem operations.
# this class is really just for documentation,
# identifying the API for a filesystem object.
# opening files for reading, and listing directories, should
# return a producer.
class
abstract_filesystem
:
def
__init__
(
self
):
pass
def
current_directory
(
self
):
"Return a string representing the current directory."
pass
def
listdir
(
self
,
path
,
long
=
0
):
"""Return a listing of the directory at 'path' The empty string
indicates the current directory. If 'long' is set, instead
return a list of (name, stat_info) tuples
"""
pass
def
open
(
self
,
path
,
mode
):
"Return an open file object"
pass
def
stat
(
self
,
path
):
"Return the equivalent of os.stat() on the given path."
pass
def
isdir
(
self
,
path
):
"Does the path represent a directory?"
pass
def
isfile
(
self
,
path
):
"Does the path represent a plain file?"
pass
def
cwd
(
self
,
path
):
"Change the working directory."
pass
def
cdup
(
self
):
"Change to the parent of the current directory."
pass
def
longify
(
self
,
path
):
"""Return a 'long' representation of the filename
[for the output of the LIST command]"""
pass
# standard wrapper around a unix-like filesystem, with a 'false root'
# capability.
# security considerations: can symbolic links be used to 'escape' the
# root? should we allow it? if not, then we could scan the
# filesystem on startup, but that would not help if they were added
# later. We will probably need to check for symlinks in the cwd method.
# what to do if wd is an invalid directory?
import
os
import
stat
import
string
def
safe_stat
(
path
):
try
:
return
(
path
,
os
.
stat
(
path
))
except
:
return
None
import
regex
import
regsub
import
glob
class
os_filesystem
:
path_module
=
os
.
path
# set this to zero if you want to disable pathname globbing.
# [we currently don't glob, anyway]
do_globbing
=
1
def
__init__
(
self
,
root
,
wd
=
'/'
):
self
.
root
=
root
self
.
wd
=
wd
def
current_directory
(
self
):
return
self
.
wd
def
isfile
(
self
,
path
):
p
=
self
.
normalize
(
self
.
path_module
.
join
(
self
.
wd
,
path
))
return
self
.
path_module
.
isfile
(
self
.
translate
(
p
))
def
isdir
(
self
,
path
):
p
=
self
.
normalize
(
self
.
path_module
.
join
(
self
.
wd
,
path
))
return
self
.
path_module
.
isdir
(
self
.
translate
(
p
))
def
cwd
(
self
,
path
):
p
=
self
.
normalize
(
self
.
path_module
.
join
(
self
.
wd
,
path
))
translated_path
=
self
.
translate
(
p
)
if
not
self
.
path_module
.
isdir
(
translated_path
):
return
0
else
:
old_dir
=
os
.
getcwd
()
# temporarily change to that directory, in order
# to see if we have permission to do so.
try
:
can
=
0
try
:
os
.
chdir
(
translated_path
)
can
=
1
self
.
wd
=
p
except
:
pass
finally
:
if
can
:
os
.
chdir
(
old_dir
)
return
can
def
cdup
(
self
):
return
self
.
cwd
(
'..'
)
def
listdir
(
self
,
path
,
long
=
0
):
p
=
self
.
translate
(
path
)
# I think we should glob, but limit it to the current
# directory only.
ld
=
os
.
listdir
(
p
)
if
not
long
:
return
list_producer
(
ld
,
0
,
None
)
else
:
old_dir
=
os
.
getcwd
()
try
:
os
.
chdir
(
p
)
# if os.stat fails we ignore that file.
result
=
filter
(
None
,
map
(
safe_stat
,
ld
))
finally
:
os
.
chdir
(
old_dir
)
return
list_producer
(
result
,
1
,
self
.
longify
)
# TODO: implement a cache w/timeout for stat()
def
stat
(
self
,
path
):
p
=
self
.
translate
(
path
)
return
os
.
stat
(
p
)
def
open
(
self
,
path
,
mode
):
p
=
self
.
translate
(
path
)
return
open
(
p
,
mode
)
def
unlink
(
self
,
path
):
p
=
self
.
translate
(
path
)
return
os
.
unlink
(
p
)
def
mkdir
(
self
,
path
):
p
=
self
.
translate
(
path
)
return
os
.
mkdir
(
p
)
def
rmdir
(
self
,
path
):
p
=
self
.
translate
(
path
)
return
os
.
rmdir
(
p
)
# utility methods
def
normalize
(
self
,
path
):
# watch for the ever-sneaky '/+' path element
path
=
regsub
.
gsub
(
'/+'
,
'/'
,
path
)
p
=
self
.
path_module
.
normpath
(
path
)
# remove 'dangling' cdup's.
if
len
(
p
)
>
2
and
p
[:
3
]
==
'/..'
:
p
=
'/'
return
p
def
translate
(
self
,
path
):
# we need to join together three separate
# path components, and do it safely.
# <real_root>/<current_directory>/<path>
# use the operating system's path separator.
path
=
string
.
join
(
string
.
split
(
path
,
'/'
),
os
.
sep
)
p
=
self
.
normalize
(
self
.
path_module
.
join
(
self
.
wd
,
path
))
p
=
self
.
normalize
(
self
.
path_module
.
join
(
self
.
root
,
p
[
1
:]))
return
p
def
longify
(
self
,
(
path
,
stat_info
)):
return
unix_longify
(
path
,
stat_info
)
def
__repr__
(
self
):
return
'<unix-style fs root:%s wd:%s>'
%
(
self
.
root
,
self
.
wd
)
if
os
.
name
==
'posix'
:
class
unix_filesystem
(
os_filesystem
):
pass
class
schizophrenic_unix_filesystem
(
os_filesystem
):
PROCESS_UID
=
os
.
getuid
()
PROCESS_EUID
=
os
.
geteuid
()
PROCESS_GID
=
os
.
getgid
()
PROCESS_EGID
=
os
.
getegid
()
def
__init__
(
self
,
root
,
wd
=
'/'
,
persona
=
(
None
,
None
)):
os_filesystem
.
__init__
(
self
,
root
,
wd
)
self
.
persona
=
persona
def
become_persona
(
self
):
if
self
.
persona
is
not
(
None
,
None
):
uid
,
gid
=
self
.
persona
# the order of these is important!
os
.
setegid
(
gid
)
os
.
seteuid
(
uid
)
def
become_nobody
(
self
):
if
self
.
persona
is
not
(
None
,
None
):
os
.
seteuid
(
self
.
PROCESS_UID
)
os
.
setegid
(
self
.
PROCESS_GID
)
# cwd, cdup, open, listdir
def
cwd
(
self
,
path
):
try
:
self
.
become_persona
()
return
os_filesystem
.
cwd
(
self
,
path
)
finally
:
self
.
become_nobody
()
def
cdup
(
self
,
path
):
try
:
self
.
become_persona
()
return
os_filesystem
.
cdup
(
self
)
finally
:
self
.
become_nobody
()
def
open
(
self
,
filename
,
mode
):
try
:
self
.
become_persona
()
return
os_filesystem
.
open
(
self
,
filename
,
mode
)
finally
:
self
.
become_nobody
()
def
listdir
(
self
,
path
,
long
=
0
):
try
:
self
.
become_persona
()
return
os_filesystem
.
listdir
(
self
,
path
,
long
)
finally
:
self
.
become_nobody
()
# This hasn't been very reliable across different platforms.
# maybe think about a separate 'directory server'.
#
# import posixpath
# import fcntl
# import FCNTL
# import select
# import asyncore
#
# # pipes /bin/ls for directory listings.
# class unix_filesystem (os_filesystem):
# pass
# path_module = posixpath
#
# def listdir (self, path, long=0):
# p = self.translate (path)
# if not long:
# return list_producer (os.listdir (p), 0, None)
# else:
# command = '/bin/ls -l %s' % p
# print 'opening pipe to "%s"' % command
# fd = os.popen (command, 'rt')
# return pipe_channel (fd)
#
# # this is both a dispatcher, _and_ a producer
# class pipe_channel (asyncore.file_dispatcher):
# buffer_size = 4096
#
# def __init__ (self, fd):
# asyncore.file_dispatcher.__init__ (self, fd)
# self.fd = fd
# self.done = 0
# self.data = ''
#
# def handle_read (self):
# if len (self.data) < self.buffer_size:
# self.data = self.data + self.fd.read (self.buffer_size)
# #print '%s.handle_read() => len(self.data) == %d' % (self, len(self.data))
#
# def handle_expt (self):
# #print '%s.handle_expt()' % self
# self.done = 1
#
# def ready (self):
# #print '%s.ready() => %d' % (self, len(self.data))
# return ((len (self.data) > 0) or self.done)
#
# def more (self):
# if self.data:
# r = self.data
# self.data = ''
# elif self.done:
# self.close()
# self.downstream.finished()
# r = ''
# else:
# r = None
# #print '%s.more() => %s' % (self, (r and len(r)))
# return r
# For the 'real' root, we could obtain a list of drives, and then
# use that. Doesn't win32 provide such a 'real' filesystem?
# [yes, I think something like this "\\.\c\windows"]
class
msdos_filesystem
(
os_filesystem
):
def
longify
(
self
,
(
path
,
stat_info
)):
return
msdos_longify
(
path
,
stat_info
)
# A merged filesystem will let you plug other filesystems together.
# We really need the equivalent of a 'mount' capability - this seems
# to be the most general idea. So you'd use a 'mount' method to place
# another filesystem somewhere in the hierarchy.
# Note: this is most likely how I will handle ~user directories
# with the http server.
class
merged_filesystem
:
def
__init__
(
self
,
*
fsys
):
pass
# this matches the output of NT's ftp server (when in
# MSDOS mode) exactly.
def
msdos_longify
(
file
,
stat_info
):
if
stat
.
S_ISDIR
(
stat_info
[
stat
.
ST_MODE
]):
dir
=
'<DIR>'
else
:
dir
=
' '
date
=
msdos_date
(
stat_info
[
stat
.
ST_MTIME
])
return
'%s %s %8d %s'
%
(
date
,
dir
,
stat_info
[
stat
.
ST_SIZE
],
file
)
def
msdos_date
(
t
):
try
:
info
=
time
.
gmtime
(
t
)
except
:
info
=
time
.
gmtime
(
0
)
# year, month, day, hour, minute, second, ...
if
info
[
3
]
>
11
:
merid
=
'PM'
info
[
3
]
=
info
[
3
]
-
12
else
:
merid
=
'AM'
return
'%02d-%02d-%02d %02d:%02d%s'
%
(
info
[
1
],
info
[
2
],
info
[
0
]
%
100
,
info
[
3
],
info
[
4
],
merid
)
months
=
[
'Jan'
,
'Feb'
,
'Mar'
,
'Apr'
,
'May'
,
'Jun'
,
'Jul'
,
'Aug'
,
'Sep'
,
'Oct'
,
'Nov'
,
'Dec'
]
mode_table
=
{
'0'
:
'---'
,
'1'
:
'--x'
,
'2'
:
'-w-'
,
'3'
:
'-wx'
,
'4'
:
'r--'
,
'5'
:
'r-x'
,
'6'
:
'rw-'
,
'7'
:
'rwx'
}
import
time
def
unix_longify
(
file
,
stat_info
):
# for now, only pay attention to the lower bits
mode
=
(
'%o'
%
stat_info
[
stat
.
ST_MODE
])[
-
3
:]
mode
=
string
.
join
(
map
(
lambda
x
:
mode_table
[
x
],
mode
),
''
)
if
stat
.
S_ISDIR
(
stat_info
[
stat
.
ST_MODE
]):
dirchar
=
'd'
else
:
dirchar
=
'-'
date
=
ls_date
(
long
(
time
.
time
()),
stat_info
[
stat
.
ST_MTIME
])
return
'%s%s %3d %-8d %-8d %8d %s %s'
%
(
dirchar
,
mode
,
stat_info
[
stat
.
ST_NLINK
],
stat_info
[
stat
.
ST_UID
],
stat_info
[
stat
.
ST_GID
],
stat_info
[
stat
.
ST_SIZE
],
date
,
file
)
# Emulate the unix 'ls' command's date field.
# it has two formats - if the date is more than 180
# days in the past, then it's like this:
# Oct 19 1995
# otherwise, it looks like this:
# Oct 19 17:33
def
ls_date
(
now
,
t
):
try
:
info
=
time
.
gmtime
(
t
)
except
:
info
=
time
.
gmtime
(
0
)
# 15,600,000 == 86,400 * 180
if
(
now
-
t
)
>
15600000
:
return
'%s %2d %d'
%
(
months
[
info
[
1
]
-
1
],
info
[
2
],
info
[
0
]
)
else
:
return
'%s %2d %02d:%02d'
%
(
months
[
info
[
1
]
-
1
],
info
[
2
],
info
[
3
],
info
[
4
]
)
# ===========================================================================
# Producers
# ===========================================================================
class
list_producer
:
def
__init__
(
self
,
file_list
,
long
,
longify
):
self
.
file_list
=
file_list
self
.
long
=
long
self
.
longify
=
longify
self
.
done
=
0
def
ready
(
self
):
if
len
(
self
.
file_list
):
return
1
else
:
if
not
self
.
done
:
self
.
done
=
1
return
0
return
(
len
(
self
.
file_list
)
>
0
)
# this should do a pushd/popd
def
more
(
self
):
if
not
self
.
file_list
:
return
''
else
:
# do a few at a time
bunch
=
self
.
file_list
[:
50
]
if
self
.
long
:
bunch
=
map
(
self
.
longify
,
bunch
)
self
.
file_list
=
self
.
file_list
[
50
:]
return
string
.
joinfields
(
bunch
,
'
\
r
\
n
'
)
+
'
\
r
\
n
'
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment